You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/07/23 12:39:43 UTC
[ambari] 01/02: AMBARI-23079. Log Feeder: support to use load
balancer for Solr API (not only cloud client) (#1835)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 8dd0150cd364f293c555e09fa47ad9e022b59d1f
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Jul 23 09:33:46 2018 +0200
AMBARI-23079. Log Feeder: support to use load balancer for Solr API (not only cloud client) (#1835)
---
.../config/solr/LogLevelFilterManagerSolr.java | 2 +-
.../ambari/logfeeder/plugin/common/ConfigItem.java | 13 +++++
.../logfeeder/common/LogFeederConstants.java | 1 +
.../common/LogFeederSolrClientFactory.java | 64 ++++++++++++++++++++++
.../ambari/logfeeder/conf/ApplicationConfig.java | 17 ++----
.../ambari/logfeeder/conf/LogFeederProps.java | 28 +++++++++-
.../apache/ambari/logfeeder/output/OutputSolr.java | 40 +++++++-------
.../test-config/logfeeder/logfeeder.properties | 3 +-
8 files changed, 132 insertions(+), 36 deletions(-)
diff --git a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
index 8d8976b..0eabead 100644
--- a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
+++ b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java
@@ -130,7 +130,7 @@ public class LogLevelFilterManagerSolr implements LogLevelFilterManager {
}
}
} catch (Exception e) {
- LOG.error("Error during getting log level filters: {}", e);
+ LOG.error("Error during getting log level filters: {}", e.getMessage());
}
logLevelFilterMap.setFilter(logLevelFilterTreeMap);
return logLevelFilterMap;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
index 1cbbfd5..5b50a7e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -111,6 +111,19 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen
this.drain = drain;
}
+ public List<String> getListValue(String key) {
+ return getListValue(key, null);
+ }
+
+ public List<String> getListValue(String key, List<String> defaultValue) {
+ Object value = configs.get(key);
+ if (value != null) {
+ return (List<String>)value;
+ } else {
+ return defaultValue;
+ }
+ }
+
public String getStringValue(String property) {
return getStringValue(property, null);
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 10e38f9..251b4fc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -102,5 +102,6 @@ public class LogFeederConstants {
public static final boolean MONITOR_SOLR_FILTER_STORAGE_DEFAULT = true;
public static final String SOLR_ZK_CONNECTION_STRING = "logfeeder.solr.zk_connect_string";
+ public static final String SOLR_URLS = "logfeeder.solr.urls";
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java
new file mode 100644
index 0000000..cf94fb5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class LogFeederSolrClientFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogFeederSolrClientFactory.class);
+
+ public SolrClient createSolrClient(String zkConnectionString, String[] solrUrls, String collection) {
+ logger.info("Creating solr client ...");
+ logger.info("Using collection=" + collection);
+ if (solrUrls != null && solrUrls.length > 0) {
+ logger.info("Using lbHttpSolrClient with urls: {}",
+ StringUtils.join(appendTo("/" + collection, solrUrls), ","));
+ LBHttpSolrClient.Builder builder = new LBHttpSolrClient.Builder();
+ builder.withBaseSolrUrls(solrUrls);
+ return builder.build();
+ } else {
+ logger.info("Using zookeepr. zkConnectString=" + zkConnectionString);
+ CloudSolrClient.Builder builder = new CloudSolrClient.Builder();
+ builder.withZkHost(zkConnectionString);
+ CloudSolrClient solrClient = builder.build();
+ solrClient.setDefaultCollection(collection);
+ return solrClient;
+ }
+ }
+
+ private String[] appendTo(String toAppend, String... appendees) {
+ for (int i = 0; i < appendees.length; i++) {
+ appendees[i] = appendees[i] + toAppend;
+ }
+ return appendees;
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index b431464..ccae373 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.conf;
import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
@@ -38,9 +39,7 @@ import org.apache.ambari.logsearch.config.local.LogSearchConfigLogFeederLocal;
import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
import org.apache.ambari.logsearch.config.solr.LogLevelFilterUpdaterSolr;
import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
-import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
@@ -69,7 +68,7 @@ public class ApplicationConfig {
}
@Bean
- @DependsOn("logSearchConfigLogFeeder")
+ @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"})
public ConfigHandler configHandler() throws Exception {
return new ConfigHandler(logSearchConfigLogFeeder());
}
@@ -95,15 +94,9 @@ public class ApplicationConfig {
@Bean
public LogLevelFilterManager logLevelFilterManager() {
if (logFeederProps.isSolrFilterStorage()) {
- if (StringUtils.isNotEmpty(logFeederProps.getSolrZkConnectString())) {
- CloudSolrClient.Builder builder = new CloudSolrClient.Builder();
- builder.withZkHost(logFeederProps.getSolrZkConnectString());
- CloudSolrClient solrClient = builder.build();
- solrClient.setDefaultCollection("history");
- return new LogLevelFilterManagerSolr(solrClient);
- } else {
- return null; // TODO: use lb http client
- }
+ SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(
+ logFeederProps.getSolrZkConnectString(), logFeederProps.getSolrUrls(), "history");
+ return new LogLevelFilterManagerSolr(solrClient);
} else { // no default filter manager
return null;
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 8f73e2b..12408b4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.AbstractEnvironment;
@@ -179,6 +180,15 @@ public class LogFeederProps implements LogFeederProperties {
@Value("${" + LogFeederConstants.SOLR_ZK_CONNECTION_STRING + ":}")
private String solrZkConnectString;
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SOLR_URLS,
+ description = "Comma separated solr urls (with protocol and port), override "+ LogFeederConstants.SOLR_ZK_CONNECTION_STRING + " config",
+ examples = {"https://localhost1:8983/solr,https://localhost2:8983"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SOLR_URLS + ":}")
+ private String solrUrlsStr;
+
@Inject
private LogEntryCacheConfig logEntryCacheConfig;
@@ -285,7 +295,7 @@ public class LogFeederProps implements LogFeederProperties {
}
public boolean isUseLocalConfigs() {
- return useLocalConfigs;
+ return this.useLocalConfigs;
}
public void setUseLocalConfigs(boolean useLocalConfigs) {
@@ -316,6 +326,21 @@ public class LogFeederProps implements LogFeederProperties {
this.solrFilterMonitor = solrFilterMonitor;
}
+ public String getSolrUrlsStr() {
+ return this.solrUrlsStr;
+ }
+
+ public void setSolrUrlsStr(String solrUrlsStr) {
+ this.solrUrlsStr = solrUrlsStr;
+ }
+
+ public String[] getSolrUrls() {
+ if (StringUtils.isNotBlank(this.solrUrlsStr)) {
+ return this.solrUrlsStr.split(",");
+ }
+ return null;
+ }
+
@PostConstruct
public void init() {
properties = new Properties();
@@ -331,4 +356,5 @@ public class LogFeederProps implements LogFeederProperties {
throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath");
}
}
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 041c1bd..6b27553 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,11 +19,13 @@
package org.apache.ambari.logfeeder.output;
+import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -76,6 +78,7 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
private String splitMode;
private int splitInterval;
private String zkConnectString;
+ private String[] solrUrls = null;
private int maxIntervalMS;
private int workers;
private int maxBufferSize;
@@ -121,8 +124,14 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
type = getStringValue("type");
zkConnectString = getStringValue("zk_connect_string");
- if (StringUtils.isEmpty(zkConnectString)) {
- throw new Exception("For solr output the zk_connect_string property need to be set");
+ List<String> solrUrlsList = getListValue("solr_urls");
+
+ if (StringUtils.isBlank(zkConnectString) && CollectionUtils.isEmpty(solrUrlsList)) {
+ throw new Exception("For solr output the zk_connect_string or solr_urls property need to be set");
+ }
+
+ if (CollectionUtils.isNotEmpty(solrUrlsList)) {
+ solrUrls = solrUrlsList.toArray(new String[0]);
}
skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME);
@@ -176,42 +185,31 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> {
private void createSolrWorkers() throws Exception, MalformedURLException {
for (int count = 0; count < workers; count++) {
- CloudSolrClient solrClient = getSolrClient(count);
+ SolrClient solrClient = getSolrClient(count);
createSolrWorkerThread(count, solrClient);
}
}
- CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
- CloudSolrClient solrClient = createSolrClient();
+ private SolrClient getSolrClient(int count) throws Exception, MalformedURLException {
+ SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(zkConnectString, solrUrls, collection);
pingSolr(count, solrClient);
-
- return solrClient;
- }
-
- private CloudSolrClient createSolrClient() throws Exception {
- LOG.info("Using zookeepr. zkConnectString=" + zkConnectString);
- LOG.info("Using collection=" + collection);
-
- CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build();
- solrClient.setDefaultCollection(collection);
return solrClient;
}
- private void pingSolr(int count, CloudSolrClient solrClient) {
+ private void pingSolr(int count, SolrClient solrClient) {
try {
- LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString);
+ LOG.info("Pinging Solr server.");
SolrPingResponse response = solrClient.ping();
if (response.getStatus() == 0) {
LOG.info("Ping to Solr server is successful for worker=" + count);
} else {
LOG.warn(
- String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " +
- "response=%s", count, zkConnectString, collection, response));
+ String.format("Ping to Solr server failed. It would check again. worker=%d, collection=%s, " +
+ "response=%s", count, collection, response));
}
} catch (Throwable t) {
LOG.warn(String.format(
- "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count,
- zkConnectString, collection), t);
+ "Ping to Solr server failed. It would check again. worker=%d, collection=%s", count, collection), t);
}
}
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 20aed68..bd59765 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -21,7 +21,6 @@ logfeeder.config.files=shipper-conf/global.config.json,\
shipper-conf/output.config.json
logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
-logfeeder.solr.core.config.name=history
logfeeder.solr.zk_connect_string=localhost:9983
logfeeder.cache.enabled=true
logfeeder.cache.size=100
@@ -31,5 +30,7 @@ logfeeder.cache.last.dedup.enabled=true
logsearch.config.zk_connect_string=localhost:9983
logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
logfeeder.docker.registry.enabled=true
+logfeeder.solr.core.config.name=history
+#logfeeder.solr.urls=http://solr:8983/solr
#logfeeder.configs.local.enabled=true
#logfeeder.configs.filter.solr.enabled=true
\ No newline at end of file