You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/06/21 14:33:54 UTC
ambari git commit: AMBARI-17277. Log Level filter not applied before
Log Search Starts at first (Miklos Gergely via oleewere)
Repository: ambari
Updated Branches:
refs/heads/trunk 4f78290a6 -> 6a6bb7a87
AMBARI-17277. Log Level filter not applied before Log Search Starts at first (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6a6bb7a8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6a6bb7a8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6a6bb7a8
Branch: refs/heads/trunk
Commit: 6a6bb7a87f01b125a0cba717f98e2c74af348d06
Parents: 4f78290
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Tue Jun 21 16:19:46 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Tue Jun 21 16:31:10 2016 +0200
----------------------------------------------------------------------
.../org/apache/ambari/logfeeder/LogFeeder.java | 4 +-
.../logconfig/FetchConfigFromSolr.java | 4 ++
.../logfeeder/logconfig/LogFeederConstants.java | 2 +-
.../ambari/logfeeder/output/OutputSolr.java | 71 ++++++++++++++------
.../apache/ambari/logfeeder/util/SolrUtil.java | 4 +-
.../src/main/resources/logfeeder.properties | 3 +-
.../ambari/logfeeder/output/OutputSolrTest.java | 2 +
.../package/templates/logfeeder.properties.j2 | 2 +-
8 files changed, 64 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 166c0f3..d00ed67 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -120,6 +120,9 @@ public class LogFeeder {
}
}
mergeAllConfigs();
+
+ LogfeederScheduler.INSTANCE.start();
+
outMgr.setOutputList(outputList);
for (Output output : outputList) {
output.init();
@@ -127,7 +130,6 @@ public class LogFeeder {
inputMgr.init();
metricsMgr.init();
//starting timer to fetch config from solr
- LogfeederScheduler.INSTANCE.start();
logger.debug("==============");
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index f2d074a..4240b86 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -153,6 +153,10 @@ public class FetchConfigFromSolr extends Thread {
return defaultLevels;
}
+ public static boolean isFilterAvailable() {
+ return logfeederFilterWrapper != null;
+ }
+
public static VLogfeederFilter findComponentFilter(String componentName) {
if (logfeederFilterWrapper != null) {
HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
index f61dc1b..f177e49 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.logconfig;
public class LogFeederConstants {
public static final String ALL = "all";
- public static final String NAME = "log_feeder_config";
+ public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
// solr fields
public static final String SOLR_LEVEL = "level";
public static final String SOLR_COMPONENT = "type";
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index a7202e7..c945ed7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -59,6 +60,8 @@ public class OutputSolr extends Output {
private static final int DEFAULT_SPLIT_INTERVAL = 30;
private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
+ private static final int RETRY_INTERVAL = 30;
+
private String collection;
private String splitMode;
private int splitInterval;
@@ -81,7 +84,7 @@ public class OutputSolr extends Output {
createSolrWorkers();
}
- private void initParams() {
+ private void initParams() throws Exception {
statMetric.metricsName = "output.solr.write_logs";
writeBytesMetric.metricsName = "output.solr.write_bytes";
@@ -89,6 +92,8 @@ public class OutputSolr extends Output {
if (!splitMode.equalsIgnoreCase("none")) {
splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
}
+ isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+
numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS);
@@ -100,7 +105,12 @@ public class OutputSolr extends Output {
maxBufferSize = 1;
}
- LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. "
+ collection = getStringValue("collection");
+ if (StringUtils.isEmpty(collection)) {
+ throw new Exception("Collection property is mandatory");
+ }
+
+ LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. "
+ getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
}
@@ -135,45 +145,44 @@ public class OutputSolr extends Output {
}
SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
- SolrClient solrClient = null;
+ SolrClient solrClient = createSolrClient(solrUrl, zkHosts, collection);
+ pingSolr(solrUrl, zkHosts, count, solrClient);
+ waitForConfig();
+
+ return solrClient;
+ }
+ private SolrClient createSolrClient(String solrUrl, String zkHosts, String collection) throws Exception, MalformedURLException {
+ SolrClient solrClient;
if (zkHosts != null) {
- solrClient = createCloudSolrClient(zkHosts);
+ solrClient = createCloudSolrClient(zkHosts, collection);
} else {
- solrClient = createHttpSolarClient(solrUrl);
+ solrClient = createHttpSolarClient(solrUrl, collection);
}
-
- pingSolr(solrUrl, zkHosts, count, solrClient);
-
return solrClient;
}
- private SolrClient createCloudSolrClient(String zkHosts) throws Exception {
+ private SolrClient createCloudSolrClient(String zkHosts, String collection) throws Exception {
LOG.info("Using zookeepr. zkHosts=" + zkHosts);
- collection = getStringValue("collection");
- if (StringUtils.isEmpty(collection)) {
- throw new Exception("For solr cloud property collection is mandatory");
- }
LOG.info("Using collection=" + collection);
CloudSolrClient solrClient = new CloudSolrClient(zkHosts);
solrClient.setDefaultCollection(collection);
- isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
return solrClient;
}
- private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
+ private SolrClient createHttpSolarClient(String solrUrl, String collection) throws MalformedURLException {
String[] solrUrls = StringUtils.split(solrUrl, ",");
if (solrUrls.length == 1) {
LOG.info("Using SolrURL=" + solrUrl);
- return new HttpSolrClient(solrUrl);
+ return new HttpSolrClient(solrUrl + "/" + collection);
} else {
LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
- LOG.info("Initial URL for LB solr=" + solrUrls[0]);
- LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]);
+ LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection);
+ LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection);
for (int i = 1; i < solrUrls.length; i++) {
- LOG.info("Adding URL for LB solr=" + solrUrls[i]);
- lbSolrClient.addSolrServer(solrUrls[i]);
+ LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection);
+ lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection);
}
return lbSolrClient;
}
@@ -194,11 +203,30 @@ public class OutputSolr extends Output {
}
} catch (Throwable t) {
LOG.warn(String.format(
- "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s",
+ "Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkHosts=%s, collection=%s",
count, solrUrl, zkHosts, collection), t);
}
}
+ private void waitForConfig() throws SolrServerException, IOException {
+ if (!LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false)) {
+ return;
+ }
+
+ while (true) {
+ LOG.info("Checking if config is available");
+ if (FetchConfigFromSolr.isFilterAvailable()) {
+ LOG.info("Config is available");
+ return;
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL * 1000);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+
private void createSolrWorkerThread(int count, SolrClient solrClient) {
SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient);
solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count);
@@ -281,7 +309,6 @@ public class OutputSolr extends Output {
class SolrWorkerThread extends Thread {
private static final String ROUTER_FIELD = "_router_field_";
- private static final int RETRY_INTERVAL = 30;
private final SolrClient solrClient;
private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index 29feef7..31fbded 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -54,7 +54,7 @@ public class SolrUtil {
private SolrUtil() throws Exception {
String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts");
- String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history");
+ String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
connectToSolr(url, zkHosts, collection);
}
@@ -180,7 +180,7 @@ public class SolrUtil {
HashMap<String, Object> configMap = new HashMap<String, Object>();
SolrQuery solrQuery = new SolrQuery();
solrQuery.setQuery("*:*");
- String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME;
+ String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
solrQuery.setFilterQueries(fq);
try {
QueryResponse response = process(solrQuery);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 6cba826..b4655cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -19,7 +19,6 @@ logfeeder.metrics.collector.hosts=
#filter config
logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
-logfeeder.solr.core.history=history
logfeeder.solr.zkhosts=
logfeeder.solr.url=
@@ -28,3 +27,5 @@ logfeeder.solr.jaas.file=/usr/lib/ambari-logsearch-logfeeder/logsearch_solr_jaas
#logfeeder tmp dir
logfeeder.tmp.dir=/tmp/$username/logfeeder/
+
+logfeeder.solr.core.config.name=history
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index afbccca..3014ed8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -83,6 +83,7 @@ public class OutputSolrTest {
Map<String, Object> config = new HashMap<String, Object>();
config.put("url", "some url");
config.put("workers", "3");
+ config.put("collection", "some collection");
outputSolr.loadConfig(config);
outputSolr.init();
@@ -153,6 +154,7 @@ public class OutputSolrTest {
Map<String, Object> config = new HashMap<String, Object>();
config.put("workers", "3");
+ config.put("collection", "some collection");
outputSolr.loadConfig(config);
outputSolr.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
index 9887e76..31c252a 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
@@ -18,8 +18,8 @@ logfeeder.metrics.collector.hosts={{logfeeder_metrics_collector_hosts}}
logfeeder.config.files={{logfeeder_config_files}}
logfeeder.log.filter.enable={{logfeeder_log_filter_enable}}
logfeeder.solr.config.interval={{logfeeder_solr_config_interval}}
-logfeeder.solr.core.history=history
logfeeder.solr.zkhosts={{zookeeper_quorum}}{{logsearch_solr_znode}}
+logfeeder.solr.core.config.name=history
# Custom properties
{% for key, value in logfeeder_custom_properties.items() %}