You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/06/21 14:33:54 UTC

ambari git commit: AMBARI-17277. Log Level filter not applied before Log Search Starts at first (Miklos Gergely via oleewere)

Repository: ambari
Updated Branches:
  refs/heads/trunk 4f78290a6 -> 6a6bb7a87


AMBARI-17277. Log Level filter not applied before Log Search Starts at first (Miklos Gergely via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6a6bb7a8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6a6bb7a8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6a6bb7a8

Branch: refs/heads/trunk
Commit: 6a6bb7a87f01b125a0cba717f98e2c74af348d06
Parents: 4f78290
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Tue Jun 21 16:19:46 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Tue Jun 21 16:31:10 2016 +0200

----------------------------------------------------------------------
 .../org/apache/ambari/logfeeder/LogFeeder.java  |  4 +-
 .../logconfig/FetchConfigFromSolr.java          |  4 ++
 .../logfeeder/logconfig/LogFeederConstants.java |  2 +-
 .../ambari/logfeeder/output/OutputSolr.java     | 71 ++++++++++++++------
 .../apache/ambari/logfeeder/util/SolrUtil.java  |  4 +-
 .../src/main/resources/logfeeder.properties     |  3 +-
 .../ambari/logfeeder/output/OutputSolrTest.java |  2 +
 .../package/templates/logfeeder.properties.j2   |  2 +-
 8 files changed, 64 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 166c0f3..d00ed67 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -120,6 +120,9 @@ public class LogFeeder {
       }
     }
     mergeAllConfigs();
+    
+    LogfeederScheduler.INSTANCE.start();
+    
     outMgr.setOutputList(outputList);
     for (Output output : outputList) {
       output.init();
@@ -127,7 +130,6 @@ public class LogFeeder {
     inputMgr.init();
     metricsMgr.init();
     //starting timer to fetch config from solr 
-    LogfeederScheduler.INSTANCE.start();
     logger.debug("==============");
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index f2d074a..4240b86 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -153,6 +153,10 @@ public class FetchConfigFromSolr extends Thread {
     return defaultLevels;
   }
 
+  public static boolean isFilterAvailable() {
+    return logfeederFilterWrapper != null;
+  }
+  
   public static VLogfeederFilter findComponentFilter(String componentName) {
     if (logfeederFilterWrapper != null) {
       HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
index f61dc1b..f177e49 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.logconfig;
 public class LogFeederConstants {
 
   public static final String ALL = "all";
-  public static final String NAME = "log_feeder_config";
+  public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
   // solr fields
   public static final String SOLR_LEVEL = "level";
   public static final String SOLR_COMPONENT = "type";

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index a7202e7..c945ed7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -59,6 +60,8 @@ public class OutputSolr extends Output {
   private static final int DEFAULT_SPLIT_INTERVAL = 30;
   private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
 
+  private static final int RETRY_INTERVAL = 30;
+
   private String collection;
   private String splitMode;
   private int splitInterval;
@@ -81,7 +84,7 @@ public class OutputSolr extends Output {
     createSolrWorkers();
   }
 
-  private void initParams() {
+  private void initParams() throws Exception {
     statMetric.metricsName = "output.solr.write_logs";
     writeBytesMetric.metricsName = "output.solr.write_bytes";
 
@@ -89,6 +92,8 @@ public class OutputSolr extends Output {
     if (!splitMode.equalsIgnoreCase("none")) {
       splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
     }
+    isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+    
     numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
 
     maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS);
@@ -100,7 +105,12 @@ public class OutputSolr extends Output {
       maxBufferSize = 1;
     }
 
-    LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. "
+    collection = getStringValue("collection");
+    if (StringUtils.isEmpty(collection)) {
+      throw new Exception("Collection property is mandatory");
+    }
+
+    LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. "
         + getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
   }
 
@@ -135,45 +145,44 @@ public class OutputSolr extends Output {
   }
 
   SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
-    SolrClient solrClient = null;
+    SolrClient solrClient = createSolrClient(solrUrl, zkHosts, collection);
+    pingSolr(solrUrl, zkHosts, count, solrClient);
+    waitForConfig();
+
+    return solrClient;
+  }
 
+  private SolrClient createSolrClient(String solrUrl, String zkHosts, String collection) throws Exception, MalformedURLException {
+    SolrClient solrClient;
     if (zkHosts != null) {
-      solrClient = createCloudSolrClient(zkHosts);
+      solrClient = createCloudSolrClient(zkHosts, collection);
     } else {
-      solrClient = createHttpSolarClient(solrUrl);
+      solrClient = createHttpSolarClient(solrUrl, collection);
     }
-
-    pingSolr(solrUrl, zkHosts, count, solrClient);
-
     return solrClient;
   }
 
-  private SolrClient createCloudSolrClient(String zkHosts) throws Exception {
+  private SolrClient createCloudSolrClient(String zkHosts, String collection) throws Exception {
     LOG.info("Using zookeepr. zkHosts=" + zkHosts);
-    collection = getStringValue("collection");
-    if (StringUtils.isEmpty(collection)) {
-      throw new Exception("For solr cloud property collection is mandatory");
-    }
     LOG.info("Using collection=" + collection);
 
     CloudSolrClient solrClient = new CloudSolrClient(zkHosts);
     solrClient.setDefaultCollection(collection);
-    isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
     return solrClient;
   }
 
-  private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
+  private SolrClient createHttpSolarClient(String solrUrl, String collection) throws MalformedURLException {
     String[] solrUrls = StringUtils.split(solrUrl, ",");
     if (solrUrls.length == 1) {
       LOG.info("Using SolrURL=" + solrUrl);
-      return new HttpSolrClient(solrUrl);
+      return new HttpSolrClient(solrUrl + "/" + collection);
     } else {
       LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
-      LOG.info("Initial URL for LB solr=" + solrUrls[0]);
-      LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]);
+      LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection);
+      LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection);
       for (int i = 1; i < solrUrls.length; i++) {
-        LOG.info("Adding URL for LB solr=" + solrUrls[i]);
-        lbSolrClient.addSolrServer(solrUrls[i]);
+        LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection);
+        lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection);
       }
       return lbSolrClient;
     }
@@ -194,11 +203,30 @@ public class OutputSolr extends Output {
       }
     } catch (Throwable t) {
       LOG.warn(String.format(
-          "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s",
+          "Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkHosts=%s, collection=%s",
           count, solrUrl, zkHosts, collection), t);
     }
   }
 
+  private void waitForConfig() throws SolrServerException, IOException {
+    if (!LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false)) {
+      return;
+    }
+    
+    while (true) {
+      LOG.info("Checking if config is available");
+      if (FetchConfigFromSolr.isFilterAvailable()) {
+        LOG.info("Config is available");
+        return;
+      }
+      try {
+        Thread.sleep(RETRY_INTERVAL * 1000);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      }
+    }
+  }
+
   private void createSolrWorkerThread(int count, SolrClient solrClient) {
     SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient);
     solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count);
@@ -281,7 +309,6 @@ public class OutputSolr extends Output {
 
   class SolrWorkerThread extends Thread {
     private static final String ROUTER_FIELD = "_router_field_";
-    private static final int RETRY_INTERVAL = 30;
 
     private final SolrClient solrClient;
     private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index 29feef7..31fbded 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -54,7 +54,7 @@ public class SolrUtil {
   private SolrUtil() throws Exception {
     String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
     String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts");
-    String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history");
+    String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
     connectToSolr(url, zkHosts, collection);
   }
 
@@ -180,7 +180,7 @@ public class SolrUtil {
     HashMap<String, Object> configMap = new HashMap<String, Object>();
     SolrQuery solrQuery = new SolrQuery();
     solrQuery.setQuery("*:*");
-    String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME;
+    String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
     solrQuery.setFilterQueries(fq);
     try {
       QueryResponse response = process(solrQuery);

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 6cba826..b4655cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -19,7 +19,6 @@ logfeeder.metrics.collector.hosts=
 #filter config
 logfeeder.log.filter.enable=true
 logfeeder.solr.config.interval=5
-logfeeder.solr.core.history=history
 logfeeder.solr.zkhosts=
 logfeeder.solr.url=
 
@@ -28,3 +27,5 @@ logfeeder.solr.jaas.file=/usr/lib/ambari-logsearch-logfeeder/logsearch_solr_jaas
 
 #logfeeder tmp dir 
 logfeeder.tmp.dir=/tmp/$username/logfeeder/
+
+logfeeder.solr.core.config.name=history

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index afbccca..3014ed8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -83,6 +83,7 @@ public class OutputSolrTest {
     Map<String, Object> config = new HashMap<String, Object>();
     config.put("url", "some url");
     config.put("workers", "3");
+    config.put("collection", "some collection");
 
     outputSolr.loadConfig(config);
     outputSolr.init();
@@ -153,6 +154,7 @@ public class OutputSolrTest {
 
     Map<String, Object> config = new HashMap<String, Object>();
     config.put("workers", "3");
+    config.put("collection", "some collection");
 
     outputSolr.loadConfig(config);
     outputSolr.init();

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a6bb7a8/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
index 9887e76..31c252a 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2
@@ -18,8 +18,8 @@ logfeeder.metrics.collector.hosts={{logfeeder_metrics_collector_hosts}}
 logfeeder.config.files={{logfeeder_config_files}}
 logfeeder.log.filter.enable={{logfeeder_log_filter_enable}}
 logfeeder.solr.config.interval={{logfeeder_solr_config_interval}}
-logfeeder.solr.core.history=history
 logfeeder.solr.zkhosts={{zookeeper_quorum}}{{logsearch_solr_znode}}
+logfeeder.solr.core.config.name=history
 
 # Custom properties
 {% for key, value in logfeeder_custom_properties.items() %}