You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/07/23 07:33:49 UTC

[ambari] branch trunk updated: AMBARI-23079. Log Feeder: support to use load balancer for Solr API (not only cloud client) (#1835)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69a64a0  AMBARI-23079. Log Feeder: support to use load balancer for Solr API (not only cloud client) (#1835)
69a64a0 is described below

commit 69a64a068911504db944d284b2ccf9f5db4462a3
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Jul 23 09:33:46 2018 +0200

    AMBARI-23079. Log Feeder: support to use load balancer for Solr API (not only cloud client) (#1835)
---
 .../config/solr/LogLevelFilterManagerSolr.java     |  2 +-
 .../ambari/logfeeder/plugin/common/ConfigItem.java | 13 +++++
 .../logfeeder/common/LogFeederConstants.java       |  1 +
 .../common/LogFeederSolrClientFactory.java         | 64 ++++++++++++++++++++++
 .../ambari/logfeeder/conf/ApplicationConfig.java   | 17 ++----
 .../ambari/logfeeder/conf/LogFeederProps.java      | 28 +++++++++-
 .../apache/ambari/logfeeder/output/OutputSolr.java | 40 +++++++-------
 .../test-config/logfeeder/logfeeder.properties     |  3 +-
 8 files changed, 132 insertions(+), 36 deletions(-)

diff --git a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
index 8d8976b..0eabead 100644
--- a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
+++ b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
@@ -130,7 +130,7 @@ public class LogLevelFilterManagerSolr implements LogLevelFilterManager {
         }
       }
     } catch (Exception e) {
-      LOG.error("Error during getting log level filters: {}", e);
+      LOG.error("Error during getting log level filters: {}", e.getMessage());
     }
     logLevelFilterMap.setFilter(logLevelFilterTreeMap);
     return logLevelFilterMap;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
index 1cbbfd5..5b50a7e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -111,6 +111,19 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen
     this.drain = drain;
   }
 
+  public List<String> getListValue(String key) {
+    return getListValue(key, null);
+  }
+
+  public List<String> getListValue(String key, List<String> defaultValue) {
+    Object value = configs.get(key);
+    if (value != null) {
+      return (List<String>)value;
+    } else {
+      return defaultValue;
+    }
+  }
+
   public String getStringValue(String property) {
     return getStringValue(property, null);
   }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 10e38f9..251b4fc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -102,5 +102,6 @@ public class LogFeederConstants {
   public static final boolean MONITOR_SOLR_FILTER_STORAGE_DEFAULT = true;
 
   public static final String SOLR_ZK_CONNECTION_STRING = "logfeeder.solr.zk_connect_string";
+  public static final String SOLR_URLS = "logfeeder.solr.urls";
 
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java
new file mode 100644
index 0000000..cf94fb5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class LogFeederSolrClientFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(LogFeederSolrClientFactory.class);
+
+  public SolrClient createSolrClient(String zkConnectionString, String[] solrUrls, String collection) {
+    logger.info("Creating solr client ...");
+    logger.info("Using collection=" + collection);
+    if (solrUrls != null && solrUrls.length > 0) {
+      logger.info("Using lbHttpSolrClient with urls: {}",
+        StringUtils.join(appendTo("/" + collection, solrUrls), ","));
+      LBHttpSolrClient.Builder builder = new LBHttpSolrClient.Builder();
+      builder.withBaseSolrUrls(solrUrls);
+      return builder.build();
+    } else {
+      logger.info("Using zookeepr. zkConnectString=" + zkConnectionString);
+      CloudSolrClient.Builder builder = new CloudSolrClient.Builder();
+      builder.withZkHost(zkConnectionString);
+      CloudSolrClient solrClient = builder.build();
+      solrClient.setDefaultCollection(collection);
+      return solrClient;
+    }
+  }
+
+  private String[] appendTo(String toAppend, String... appendees) {
+    for (int i = 0; i < appendees.length; i++) {
+      appendees[i] = appendees[i] + toAppend;
+    }
+    return appendees;
+  }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index b431464..ccae373 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logfeeder.conf;
 
 import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
 import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
@@ -38,9 +39,7 @@ import org.apache.ambari.logsearch.config.local.LogSearchConfigLogFeederLocal;
 import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
 import org.apache.ambari.logsearch.config.solr.LogLevelFilterUpdaterSolr;
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.DependsOn;
@@ -69,7 +68,7 @@ public class ApplicationConfig {
   }
 
   @Bean
-  @DependsOn("logSearchConfigLogFeeder")
+  @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"})
   public ConfigHandler configHandler() throws Exception {
     return new ConfigHandler(logSearchConfigLogFeeder());
   }
@@ -95,15 +94,9 @@ public class ApplicationConfig {
   @Bean
   public LogLevelFilterManager logLevelFilterManager() {
     if (logFeederProps.isSolrFilterStorage()) {
-      if (StringUtils.isNotEmpty(logFeederProps.getSolrZkConnectString())) {
-        CloudSolrClient.Builder builder = new CloudSolrClient.Builder();
-        builder.withZkHost(logFeederProps.getSolrZkConnectString());
-        CloudSolrClient solrClient = builder.build();
-        solrClient.setDefaultCollection("history");
-        return new LogLevelFilterManagerSolr(solrClient);
-      } else {
-        return null; // TODO: use lb http client
-      }
+      SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(
+        logFeederProps.getSolrZkConnectString(), logFeederProps.getSolrUrls(), "history");
+      return new LogLevelFilterManagerSolr(solrClient);
     } else { // no default filter manager
       return null;
     }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 8f73e2b..12408b4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.conf;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.commons.lang.StringUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.env.AbstractEnvironment;
@@ -179,6 +180,15 @@ public class LogFeederProps implements LogFeederProperties {
   @Value("${" + LogFeederConstants.SOLR_ZK_CONNECTION_STRING + ":}")
   private String solrZkConnectString;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.SOLR_URLS,
+    description = "Comma separated solr urls (with protocol and port), override "+ LogFeederConstants.SOLR_ZK_CONNECTION_STRING + " config",
+    examples = {"https://localhost1:8983/solr,https://localhost2:8983"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.SOLR_URLS + ":}")
+  private String solrUrlsStr;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -285,7 +295,7 @@ public class LogFeederProps implements LogFeederProperties {
   }
 
   public boolean isUseLocalConfigs() {
-    return useLocalConfigs;
+    return this.useLocalConfigs;
   }
 
   public void setUseLocalConfigs(boolean useLocalConfigs) {
@@ -316,6 +326,21 @@ public class LogFeederProps implements LogFeederProperties {
     this.solrFilterMonitor = solrFilterMonitor;
   }
 
+  public String getSolrUrlsStr() {
+    return this.solrUrlsStr;
+  }
+
+  public void setSolrUrlsStr(String solrUrlsStr) {
+    this.solrUrlsStr = solrUrlsStr;
+  }
+
+  public String[] getSolrUrls() {
+    if (StringUtils.isNotBlank(this.solrUrlsStr)) {
+      return this.solrUrlsStr.split(",");
+    }
+    return null;
+  }
+
   @PostConstruct
   public void init() {
     properties = new Properties();
@@ -331,4 +356,5 @@ public class LogFeederProps implements LogFeederProperties {
       throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath");
     }
   }
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 041c1bd..6b27553 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,11 +19,13 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -76,6 +78,7 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
   private String splitMode;
   private int splitInterval;
   private String zkConnectString;
+  private String[] solrUrls = null;
   private int maxIntervalMS;
   private int workers;
   private int maxBufferSize;
@@ -121,8 +124,14 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
     type = getStringValue("type");
 
     zkConnectString = getStringValue("zk_connect_string");
-    if (StringUtils.isEmpty(zkConnectString)) {
-      throw new Exception("For solr output the zk_connect_string property need to be set");
+    List<String> solrUrlsList = getListValue("solr_urls");
+
+    if (StringUtils.isBlank(zkConnectString) && CollectionUtils.isEmpty(solrUrlsList)) {
+      throw new Exception("For solr output the zk_connect_string or solr_urls property need to be set");
+    }
+
+    if (CollectionUtils.isNotEmpty(solrUrlsList)) {
+      solrUrls = solrUrlsList.toArray(new String[0]);
     }
 
     skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME);
@@ -176,42 +185,31 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
 
   private void createSolrWorkers() throws Exception, MalformedURLException {
     for (int count = 0; count < workers; count++) {
-      CloudSolrClient solrClient = getSolrClient(count);
+      SolrClient solrClient = getSolrClient(count);
       createSolrWorkerThread(count, solrClient);
     }
   }
 
-  CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
-    CloudSolrClient solrClient = createSolrClient();
+  private SolrClient getSolrClient(int count) throws Exception, MalformedURLException {
+    SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(zkConnectString, solrUrls, collection);
     pingSolr(count, solrClient);
-
-    return solrClient;
-  }
-
-  private CloudSolrClient createSolrClient() throws Exception {
-    LOG.info("Using zookeepr. zkConnectString=" + zkConnectString);
-    LOG.info("Using collection=" + collection);
-
-    CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build();
-    solrClient.setDefaultCollection(collection);
     return solrClient;
   }
 
-  private void pingSolr(int count, CloudSolrClient solrClient) {
+  private void pingSolr(int count, SolrClient solrClient) {
     try {
-      LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString);
+      LOG.info("Pinging Solr server.");
       SolrPingResponse response = solrClient.ping();
       if (response.getStatus() == 0) {
         LOG.info("Ping to Solr server is successful for worker=" + count);
       } else {
         LOG.warn(
-            String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " +
-                "response=%s", count, zkConnectString, collection, response));
+            String.format("Ping to Solr server failed. It would check again. worker=%d, collection=%s, " +
+                "response=%s", count, collection, response));
       }
     } catch (Throwable t) {
       LOG.warn(String.format(
-          "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count,
-          zkConnectString, collection), t);
+          "Ping to Solr server failed. It would check again. worker=%d, collection=%s", count, collection), t);
     }
   }
 
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 20aed68..bd59765 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -21,7 +21,6 @@ logfeeder.config.files=shipper-conf/global.config.json,\
   shipper-conf/output.config.json
 logfeeder.log.filter.enable=true
 logfeeder.solr.config.interval=5
-logfeeder.solr.core.config.name=history
 logfeeder.solr.zk_connect_string=localhost:9983
 logfeeder.cache.enabled=true
 logfeeder.cache.size=100
@@ -31,5 +30,7 @@ logfeeder.cache.last.dedup.enabled=true
 logsearch.config.zk_connect_string=localhost:9983
 logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
 logfeeder.docker.registry.enabled=true
+logfeeder.solr.core.config.name=history
+#logfeeder.solr.urls=http://solr:8983/solr
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
\ No newline at end of file