You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/02/24 00:17:12 UTC
[2/2] ambari git commit: AMBARI-20038 LogFeeder Simulator
Enhancements for 3000-node cluster testing (mgergely)
AMBARI-20038 LogFeeder Simulator Enhancements for 3000-node cluster testing (mgergely)
Change-Id: Ie785b15b531b94b989c715a564287e32f77c8000
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fc5cbc6e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fc5cbc6e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fc5cbc6e
Branch: refs/heads/trunk
Commit: fc5cbc6e3c806a3e7193dff17093db8d1367fb74
Parents: 1f41cc0
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Fri Feb 24 01:16:44 2017 +0100
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Fri Feb 24 01:16:44 2017 +0100
----------------------------------------------------------------------
.../org/apache/ambari/logfeeder/LogFeeder.java | 13 ++++++--
.../ambari/logfeeder/common/ConfigBlock.java | 2 +-
.../apache/ambari/logfeeder/filter/Filter.java | 4 +--
.../ambari/logfeeder/input/InputSimulate.java | 31 ++++++++++++++++++--
4 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/fc5cbc6e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 24651ba..d584890 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -245,7 +245,7 @@ public class LogFeeder {
// We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
if (output.getBooleanValue("is_enabled", true)) {
- output.logConfgs(Level.INFO);
+ output.logConfigs(Level.INFO);
outputManager.add(output);
} else {
LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
@@ -277,7 +277,7 @@ public class LogFeeder {
input.setOutputManager(outputManager);
input.setInputManager(inputManager);
inputManager.add(input);
- input.logConfgs(Level.INFO);
+ input.logConfigs(Level.INFO);
} else {
LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
}
@@ -311,7 +311,7 @@ public class LogFeeder {
if (filter.isEnabled()) {
filter.setOutputManager(outputManager);
input.addFilter(filter);
- filter.logConfgs(Level.INFO);
+ filter.logConfigs(Level.INFO);
} else {
LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
}
@@ -371,6 +371,13 @@ public class LogFeeder {
}
}
}
+
+ // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
+ for (Output output : InputSimulate.getSimulateOutputs()) {
+ outputManager.add(output);
+ usedOutputSet.add(output);
+ }
+
outputManager.retainUsedOutputs(usedOutputSet);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fc5cbc6e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 47ddc51..68897e8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -223,7 +223,7 @@ public abstract class ConfigBlock {
logStatForMetric(statMetric, "Stat");
}
- public boolean logConfgs(Priority level) {
+ public boolean logConfigs(Priority level) {
if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fc5cbc6e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index 684f3c4..afd903e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -173,8 +173,8 @@ public abstract class Filter extends ConfigBlock {
}
@Override
- public boolean logConfgs(Priority level) {
- if (!super.logConfgs(level)) {
+ public boolean logConfigs(Priority level) {
+ if (!super.logConfigs(level)) {
return false;
}
LOG.log(level, "input=" + input.getShortDescription());
http://git-wip-us.apache.org/repos/asf/ambari/blob/fc5cbc6e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index be97a52..2222f93 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -29,17 +29,21 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.filter.FilterJSON;
+import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
import org.apache.solr.common.util.Base64;
import com.google.common.base.Joiner;
public class InputSimulate extends Input {
+ private static final Logger LOG = Logger.getLogger(InputSimulate.class);
- private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\"}";
+ private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
private static final Map<String, String> typeToFilePath = new HashMap<>();
public static void loadTypeToFilePath(List<Map<String, Object>> inputList) {
@@ -52,6 +56,13 @@ public class InputSimulate extends Input {
private static final Map<String, Integer> typeToLineNumber = new HashMap<>();
+ private static final AtomicInteger hostNumber = new AtomicInteger(0);
+
+ private static final List<Output> simulateOutputs = new ArrayList<>();
+ public static List<Output> getSimulateOutputs() {
+ return simulateOutputs;
+ }
+
private final Random random = new Random(System.currentTimeMillis());
private final List<String> types;
@@ -60,6 +71,7 @@ public class InputSimulate extends Input {
private final int minLogWords;
private final int maxLogWords;
private final long sleepMillis;
+ private final String host;
public InputSimulate() throws Exception {
this.types = getSimulatedLogTypes();
@@ -68,6 +80,7 @@ public class InputSimulate extends Input {
this.minLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.min_log_words", 5, 1, 10);
this.maxLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.max_log_words", 10, 10, 20);
this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds", 10000);
+ this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName;
Filter filter = new FilterJSON();
filter.loadConfig(Collections.<String, Object> emptyMap());
@@ -87,6 +100,20 @@ public class InputSimulate extends Input {
}
@Override
+ public void addOutput(Output output) {
+ try {
+ Class<? extends Output> clazz = output.getClass();
+ Output outputCopy = clazz.newInstance();
+ outputCopy.loadConfig(output.getConfigs());
+ simulateOutputs.add(outputCopy);
+ super.addOutput(outputCopy);
+ } catch (Exception e) {
+ LOG.warn("Could not copy Output class " + output.getClass() + ", using original output");
+ super.addOutput(output);
+ }
+ }
+
+ @Override
public boolean isReady() {
return true;
}
@@ -143,7 +170,7 @@ public class InputSimulate extends Input {
private String getLine() {
Date d = new Date();
String logMessage = createLogMessage();
- return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage);
+ return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage, host);
}
private String createLogMessage() {