You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/07/27 14:53:44 UTC
[1/2] ambari git commit: AMBARI-21507 Log Search Solr output
properties should be provided by the Config API (mgergely)
Repository: ambari
Updated Branches:
refs/heads/trunk e3a50d946 -> dc85e67d7
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/service_advisor.py b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/service_advisor.py
index fc49824..6b29dbc 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/service_advisor.py
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/service_advisor.py
@@ -122,24 +122,19 @@ class LogSearchServiceAdvisor(service_advisor.ServiceAdvisor):
logSearchServerHosts = self.getComponentHostNames(services, "LOGSEARCH", "LOGSEARCH_SERVER")
# if there is no Log Search server on the cluster, i.e. there is an external server
if logSearchServerHosts is None or len(logSearchServerHosts) == 0:
- # hide logsearch specific attributes, except for a few which are used by the logfeeders too
+ # hide logsearch specific attributes
for key in services['configurations']['logsearch-env']['properties']:
putLogSearchEnvAttribute(key, 'visible', 'false')
for key in services['configurations']['logsearch-properties']['properties']:
- if key not in ['logsearch.collection.service.logs.numshards', 'logsearch.collection.audit.logs.numshards',
- 'logsearch.solr.collection.service.logs', 'logsearch.solr.collection.audit.logs',
- 'logsearch.service.logs.split.interval.mins', 'logsearch.audit.logs.split.interval.mins']:
- putLogSearchAttribute(key, 'visible', 'false')
+ putLogSearchAttribute(key, 'visible', 'false')
for key in services['configurations']['logsearch-audit_logs-solrconfig']['properties']:
self.putPropertyAttribute(configurations, "logsearch-audit_logs-solrconfig")(key, 'visible', 'false')
for key in services['configurations']['logsearch-service_logs-solrconfig']['properties']:
self.putPropertyAttribute(configurations, "logsearch-service_logs-solrconfig")(key, 'visible', 'false')
for key in services['configurations']['logsearch-log4j']['properties']:
self.putPropertyAttribute(configurations, "logsearch-log4j")(key, 'visible', 'false')
-
- # in the abscence of a server we can't provide a good estimation for the number of shards
- putLogSearchProperty("logsearch.collection.service.logs.numshards", 2)
- putLogSearchProperty("logsearch.collection.audit.logs.numshards", 2)
+ for key in services['configurations']['logsearch-admin-json']['properties']:
+ self.putPropertyAttribute(configurations, "logsearch-admin-json")(key, 'visible', 'false')
# if there is a Log Search server on the cluster
else:
infraSolrHosts = self.getComponentHostNames(services, "AMBARI_INFRA", "INFRA_SOLR")
@@ -161,17 +156,17 @@ class LogSearchServiceAdvisor(service_advisor.ServiceAdvisor):
putLogSearchCommonEnvProperty('logsearch_use_external_solr', 'true')
- # recommend number of shard
- putLogSearchAttribute('logsearch.collection.service.logs.numshards', 'minimum', recommendedMinShards)
- putLogSearchAttribute('logsearch.collection.service.logs.numshards', 'maximum', recommendedMaxShards)
- putLogSearchProperty("logsearch.collection.service.logs.numshards", recommendedShards)
+ # recommend number of shard
+ putLogSearchAttribute('logsearch.collection.service.logs.numshards', 'minimum', recommendedMinShards)
+ putLogSearchAttribute('logsearch.collection.service.logs.numshards', 'maximum', recommendedMaxShards)
+ putLogSearchProperty("logsearch.collection.service.logs.numshards", recommendedShards)
- putLogSearchAttribute('logsearch.collection.audit.logs.numshards', 'minimum', recommendedMinShards)
- putLogSearchAttribute('logsearch.collection.audit.logs.numshards', 'maximum', recommendedMaxShards)
- putLogSearchProperty("logsearch.collection.audit.logs.numshards", recommendedShards)
- # recommend replication factor
- putLogSearchProperty("logsearch.collection.service.logs.replication.factor", recommendedReplicationFactor)
- putLogSearchProperty("logsearch.collection.audit.logs.replication.factor", recommendedReplicationFactor)
+ putLogSearchAttribute('logsearch.collection.audit.logs.numshards', 'minimum', recommendedMinShards)
+ putLogSearchAttribute('logsearch.collection.audit.logs.numshards', 'maximum', recommendedMaxShards)
+ putLogSearchProperty("logsearch.collection.audit.logs.numshards", recommendedShards)
+ # recommend replication factor
+ putLogSearchProperty("logsearch.collection.service.logs.replication.factor", recommendedReplicationFactor)
+ putLogSearchProperty("logsearch.collection.audit.logs.replication.factor", recommendedReplicationFactor)
kerberos_authentication_enabled = self.isSecurityEnabled(services)
# if there is no kerberos enabled hide kerberor related properties
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
index d7bdf75..a7417d4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java
@@ -331,9 +331,9 @@ public class UpgradeCatalog300Test {
Map<String, String> expectedLogFeederLog4j = ImmutableMap.of(
"content", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
- Config mockLogFeederLog4j = easyMockSupport.createNiceMock(Config.class);
- expect(cluster.getDesiredConfigByType("logfeeder-log4j")).andReturn(mockLogFeederLog4j).atLeastOnce();
- expect(mockLogFeederLog4j.getProperties()).andReturn(oldLogFeederLog4j).anyTimes();
+ Config logFeederLog4jConf = easyMockSupport.createNiceMock(Config.class);
+ expect(cluster.getDesiredConfigByType("logfeeder-log4j")).andReturn(logFeederLog4jConf).atLeastOnce();
+ expect(logFeederLog4jConf.getProperties()).andReturn(oldLogFeederLog4j).anyTimes();
Capture<Map<String, String>> logFeederLog4jCapture = EasyMock.newCapture();
expect(controller.createConfig(anyObject(Cluster.class), anyObject(StackId.class), anyString(), capture(logFeederLog4jCapture), anyString(),
EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
@@ -344,45 +344,75 @@ public class UpgradeCatalog300Test {
Map<String, String> expectedLogSearchLog4j = ImmutableMap.of(
"content", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
- Config mockLogSearchLog4j = easyMockSupport.createNiceMock(Config.class);
- expect(cluster.getDesiredConfigByType("logsearch-log4j")).andReturn(mockLogSearchLog4j).atLeastOnce();
- expect(mockLogSearchLog4j.getProperties()).andReturn(oldLogSearchLog4j).anyTimes();
+ Config logSearchLog4jConf = easyMockSupport.createNiceMock(Config.class);
+ expect(cluster.getDesiredConfigByType("logsearch-log4j")).andReturn(logSearchLog4jConf).atLeastOnce();
+ expect(logSearchLog4jConf.getProperties()).andReturn(oldLogSearchLog4j).anyTimes();
Capture<Map<String, String>> logSearchLog4jCapture = EasyMock.newCapture();
expect(controller.createConfig(anyObject(Cluster.class), anyObject(StackId.class), anyString(), capture(logSearchLog4jCapture), anyString(),
EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
Map<String, String> oldLogSearchServiceLogsConf = ImmutableMap.of(
- "content", "<before/><requestHandler name=\"/admin/\" class=\"solr.admin.AdminHandlers\" /><after/>");
+ "content", "<before/><requestHandler name=\"/admin/\" class=\"solr.admin.AdminHandlers\" /><after/>");
Map<String, String> expectedLogSearchServiceLogsConf = ImmutableMap.of(
- "content", "<before/><after/>");
+ "content", "<before/><after/>");
- Config confLogSearchServiceLogsConf = easyMockSupport.createNiceMock(Config.class);
- expect(cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig")).andReturn(confLogSearchServiceLogsConf).atLeastOnce();
- expect(confLogSearchServiceLogsConf.getProperties()).andReturn(oldLogSearchServiceLogsConf).anyTimes();
+ Config logSearchServiceLogsConf = easyMockSupport.createNiceMock(Config.class);
+ expect(cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig")).andReturn(logSearchServiceLogsConf).atLeastOnce();
+ expect(logSearchServiceLogsConf.getProperties()).andReturn(oldLogSearchServiceLogsConf).anyTimes();
Capture<Map<String, String>> logSearchServiceLogsConfCapture = EasyMock.newCapture();
expect(controller.createConfig(anyObject(Cluster.class), anyObject(StackId.class), anyString(), capture(logSearchServiceLogsConfCapture), anyString(),
- EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
+ EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
Map<String, String> oldLogSearchAuditLogsConf = ImmutableMap.of(
- "content", "<before/><requestHandler name=\"/admin/\" class=\"solr.admin.AdminHandlers\" /><after/>");
+ "content", "<before/><requestHandler name=\"/admin/\" class=\"solr.admin.AdminHandlers\" /><after/>");
Map<String, String> expectedLogSearchAuditLogsConf = ImmutableMap.of(
- "content", "<before/><after/>");
+ "content", "<before/><after/>");
- Config confLogSearchAuditLogsConf = easyMockSupport.createNiceMock(Config.class);
- expect(cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig")).andReturn(confLogSearchAuditLogsConf).atLeastOnce();
- expect(confLogSearchAuditLogsConf.getProperties()).andReturn(oldLogSearchAuditLogsConf).anyTimes();
+ Config logSearchAuditLogsConf = easyMockSupport.createNiceMock(Config.class);
+ expect(cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig")).andReturn(logSearchAuditLogsConf).atLeastOnce();
+ expect(logSearchAuditLogsConf.getProperties()).andReturn(oldLogSearchAuditLogsConf).anyTimes();
Capture<Map<String, String>> logSearchAuditLogsConfCapture = EasyMock.newCapture();
expect(controller.createConfig(anyObject(Cluster.class), anyObject(StackId.class), anyString(), capture(logSearchAuditLogsConfCapture), anyString(),
- EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
+ EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
+
+ Map<String, String> oldLogFeederOutputConf = ImmutableMap.of(
+ "content",
+ " \"zk_connect_string\":\"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}\",\n" +
+ " \"collection\":\"{{logsearch_solr_collection_service_logs}}\",\n" +
+ " \"number_of_shards\": \"{{logsearch_collection_service_logs_numshards}}\",\n" +
+ " \"splits_interval_mins\": \"{{logsearch_service_logs_split_interval_mins}}\",\n" +
+ "\n" +
+ " \"zk_connect_string\":\"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}\",\n" +
+ " \"collection\":\"{{logsearch_solr_collection_audit_logs}}\",\n" +
+ " \"number_of_shards\": \"{{logsearch_collection_audit_logs_numshards}}\",\n" +
+ " \"splits_interval_mins\": \"{{logsearch_audit_logs_split_interval_mins}}\",\n"
+ );
+
+ Map<String, String> expectedLogFeederOutputConf = ImmutableMap.of(
+ "content",
+ " \"zk_connect_string\":\"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}\",\n" +
+ " \"type\": \"service\",\n" +
+ "\n" +
+ " \"zk_connect_string\":\"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}\",\n" +
+ " \"type\": \"audit\",\n"
+ );
+
+ Config logFeederOutputConf = easyMockSupport.createNiceMock(Config.class);
+ expect(cluster.getDesiredConfigByType("logfeeder-output-config")).andReturn(logFeederOutputConf).atLeastOnce();
+ expect(logFeederOutputConf.getProperties()).andReturn(oldLogFeederOutputConf).anyTimes();
+ Capture<Map<String, String>> logFeederOutputConfCapture = EasyMock.newCapture();
+ expect(controller.createConfig(anyObject(Cluster.class), anyObject(StackId.class), anyString(), capture(logFeederOutputConfCapture), anyString(),
+ EasyMock.<Map<String, Map<String, String>>>anyObject())).andReturn(config).once();
replay(clusters, cluster);
replay(controller, injector2);
replay(confSomethingElse1, confSomethingElse2, confLogSearchConf1, confLogSearchConf2);
replay(logSearchPropertiesConf, logFeederPropertiesConf);
- replay(mockLogFeederLog4j, mockLogSearchLog4j);
- replay(confLogSearchServiceLogsConf, confLogSearchAuditLogsConf);
+ replay(logFeederLog4jConf, logSearchLog4jConf);
+ replay(logSearchServiceLogsConf, logSearchAuditLogsConf);
+ replay(logFeederOutputConf);
new UpgradeCatalog300(injector2).updateLogSearchConfigs();
easyMockSupport.verifyAll();
@@ -409,5 +439,8 @@ public class UpgradeCatalog300Test {
Map<String, String> updatedAuditLogsConf = logSearchAuditLogsConfCapture.getValue();
assertTrue(Maps.difference(expectedLogSearchAuditLogsConf, updatedAuditLogsConf).areEqual());
+
+ Map<String, String> updatedLogFeederOutputConf = logFeederOutputConfCapture.getValue();
+ assertTrue(Maps.difference(expectedLogFeederOutputConf, updatedLogFeederOutputConf).areEqual());
}
}
[2/2] ambari git commit: AMBARI-21507 Log Search Solr output
properties should be provided by the Config API (mgergely)
Posted by mg...@apache.org.
AMBARI-21507 Log Search Solr output properties should be provided by the Config API (mgergely)
Change-Id: I32ec1afa8549b7e065fa904f2de2db0b255f690f
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dc85e67d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dc85e67d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dc85e67d
Branch: refs/heads/trunk
Commit: dc85e67d7d1f1287398824541c99b7f3872796a0
Parents: e3a50d9
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Thu Jul 27 16:53:34 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Thu Jul 27 16:53:34 2017 +0200
----------------------------------------------------------------------
.../logsearch/config/api/LogSearchConfig.java | 41 +++-
.../config/api/OutputConfigMonitor.java | 44 +++++
.../model/outputconfig/OutputProperties.java | 23 +++
.../outputconfig/OutputSolrProperties.java | 26 +++
.../config/api/LogSearchConfigClass1.java | 19 +-
.../config/api/LogSearchConfigClass2.java | 19 +-
.../config/zookeeper/LogSearchConfigZK.java | 87 +++++++--
.../impl/OutputSolrPropertiesImpl.java | 46 +++++
.../org/apache/ambari/logfeeder/LogFeeder.java | 5 +-
.../ambari/logfeeder/common/ConfigHandler.java | 11 +-
.../logfeeder/common/LogEntryParseTester.java | 2 +-
.../logfeeder/input/InputConfigUploader.java | 2 +-
.../ambari/logfeeder/input/InputSimulate.java | 1 +
.../apache/ambari/logfeeder/output/Output.java | 36 +++-
.../ambari/logfeeder/output/OutputManager.java | 11 ++
.../ambari/logfeeder/output/OutputSolr.java | 187 +++++++++++--------
.../ambari/logfeeder/output/OutputSolrTest.java | 29 ++-
.../logsearch/conf/SolrAuditLogPropsConfig.java | 5 +
.../conf/SolrEventHistoryPropsConfig.java | 5 +
.../ambari/logsearch/conf/SolrPropsConfig.java | 2 +
.../conf/SolrServiceLogPropsConfig.java | 5 +
.../configurer/LogSearchConfigConfigurer.java | 3 +
.../configurer/SolrCollectionConfigurer.java | 5 +-
.../ambari/logsearch/dao/AuditSolrDao.java | 1 +
.../logsearch/dao/ServiceLogsSolrDao.java | 1 +
.../ambari/logsearch/dao/SolrDaoBase.java | 13 +-
.../handler/CreateCollectionHandler.java | 12 +-
.../logsearch/manager/ShipperConfigManager.java | 10 +-
.../logfeeder/shipper-conf/output.config.json | 10 +-
.../server/upgrade/UpgradeCatalog300.java | 42 +++--
.../0.5.0/properties/output.config.json.j2 | 8 +-
.../LOGSEARCH/0.5.0/service_advisor.py | 33 ++--
.../server/upgrade/UpgradeCatalog300Test.java | 73 ++++++--
33 files changed, 625 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 6c5cefd..76be392 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
/**
@@ -57,14 +58,23 @@ public interface LogSearchConfig extends Closeable {
List<String> getServices(String clusterName);
/**
- * Checks if input configuration exists.
+ * Checks if input configuration exists. Will be used only in LOGFEEDER mode.
+ *
+ * @param serviceName The name of the service looked for.
+ * @return If input configuration exists for the service.
+ * @throws Exception
+ */
+ boolean inputConfigExistsLogFeeder(String serviceName) throws Exception;
+
+ /**
+ * Checks if input configuration exists. Will be used only in SERVER mode.
*
* @param clusterName The name of the cluster where the service is looked for.
* @param serviceName The name of the service looked for.
* @return If input configuration exists for the service.
* @throws Exception
*/
- boolean inputConfigExists(String clusterName, String serviceName) throws Exception;
+ boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception;
/**
* Returns the global configurations of a cluster. Will be used only in SERVER mode.
@@ -140,4 +150,31 @@ public interface LogSearchConfig extends Closeable {
*/
void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor,
String clusterName) throws Exception;
+
+ /**
+ * Saves the properties of an Output Solr. Will be used only in SERVER mode.
+ *
+ * @param type The type of the Output Solr.
+ * @param outputSolrProperties The properties of the Output Solr.
+ * @throws Exception
+ */
+ void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception;
+
+ /**
+ * Get the properties of an Output Solr. Will be used only in LOGFEEDER mode.
+ *
+ * @param type The type of the Output Solr.
+ * @return The properties of the Output Solr, or null if it doesn't exist.
+ * @throws Exception
+ */
+ OutputSolrProperties getOutputSolrProperties(String type) throws Exception;
+
+ /**
+ * Saves the properties of an Output Solr. Will be used only in LOGFEEDER mode.
+ *
+ * @param type The type of the Output Solr.
+ * @param outputConfigMonitors The monitors which want to watch the output config changes.
+ * @throws Exception
+ */
+ void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
new file mode 100644
index 0000000..c54626d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+
+/**
+ * Monitors output configuration changes.
+ */
+public interface OutputConfigMonitor {
+ /**
+ * @return The destination of the output.
+ */
+ String getDestination();
+
+ /**
+ * @return The type of the output logs.
+ */
+ String getOutputType();
+
+ /**
+ * Will be called whenever there is a change in the configuration of the output.
+ *
+ * @param outputProperties The modified properties of the output.
+ */
+ void outputConfigChanged(OutputProperties outputProperties);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
new file mode 100644
index 0000000..affd5b9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.outputconfig;
+
+public interface OutputProperties {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
new file mode 100644
index 0000000..586e785
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.outputconfig;
+
+public interface OutputSolrProperties extends OutputProperties {
+ String getCollection();
+
+ String getSplitIntervalMins();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index 28844d5..e308346 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
public class LogSearchConfigClass1 implements LogSearchConfig {
@@ -33,7 +34,12 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
public void init(Component component, Map<String, String> properties, String clusterName) {}
@Override
- public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
return false;
}
@@ -74,5 +80,16 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
}
@Override
+ public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {}
+
+ @Override
+ public OutputSolrProperties getOutputSolrProperties(String type) {
+ return null;
+ }
+
+ @Override
+ public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {}
+
+ @Override
public void close() {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index 5934fa6..b64dae8 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
public class LogSearchConfigClass2 implements LogSearchConfig {
@@ -33,7 +34,12 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
public void init(Component component, Map<String, String> properties, String clusterName) {}
@Override
- public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
return false;
}
@@ -74,5 +80,16 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
}
@Override
+ public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {}
+
+ @Override
+ public OutputSolrProperties getOutputSolrProperties(String type) {
+ return null;
+ }
+
+ @Override
+ public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {}
+
+ @Override
public void close() {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index fdd8ed6..387d0c6 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.commons.collections.MapUtils;
@@ -98,9 +101,12 @@ public class LogSearchConfigZK implements LogSearchConfig {
private Map<String, String> properties;
private CuratorFramework client;
- private TreeCache cache;
private Gson gson;
+ private TreeCache serverCache;
+ private TreeCache logFeederClusterCache;
+ private TreeCache outputCache;
+
@Override
public void init(Component component, Map<String, String> properties, String clusterName) throws Exception {
this.properties = properties;
@@ -115,28 +121,39 @@ public class LogSearchConfigZK implements LogSearchConfig {
.build();
client.start();
+ outputCache = new TreeCache(client, "/output");
+ outputCache.start();
if (component == Component.SERVER) {
if (client.checkExists().forPath("/") == null) {
client.create().creatingParentContainersIfNeeded().forPath("/");
}
- cache = new TreeCache(client, "/");
- cache.start();
+ if (client.checkExists().forPath("/output") == null) {
+ client.create().creatingParentContainersIfNeeded().forPath("/output");
+ }
+ serverCache = new TreeCache(client, "/");
+ serverCache.start();
} else {
while (client.checkExists().forPath("/") == null) {
LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
}
- cache = new TreeCache(client, String.format("/%s", clusterName));
+ logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName));
}
gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
}
@Override
- public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+ String nodePath = String.format("/input/%s", serviceName);
+ return logFeederClusterCache.getCurrentData(nodePath) != null;
+ }
+
+ @Override
+ public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
- return cache.getCurrentData(nodePath) != null;
+ return serverCache.getCurrentData(nodePath) != null;
}
@Override
@@ -261,8 +278,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
}
}
};
- cache.getListenable().addListener(listener);
- cache.start();
+ logFeederClusterCache.getListenable().addListener(listener);
+ logFeederClusterCache.start();
}
private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) {
@@ -270,7 +287,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
String data = InputConfigGson.gson.toJson(globalConfigNode);
try {
- if (cache.getCurrentData(globalConfigNodePath) != null) {
+ if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
client.setData().forPath(globalConfigNodePath, data.getBytes());
} else {
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes());
@@ -283,14 +300,14 @@ public class LogSearchConfigZK implements LogSearchConfig {
@Override
public List<String> getServices(String clusterName) {
String parentPath = String.format("/%s/input", clusterName);
- Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath);
+ Map<String, ChildData> serviceNodes = serverCache.getCurrentChildren(parentPath);
return new ArrayList<String>(serviceNodes.keySet());
}
@Override
public String getGlobalConfigs(String clusterName) {
String globalConfigNodePath = String.format("/%s/global", clusterName);
- return new String(cache.getCurrentData(globalConfigNodePath).getData());
+ return new String(serverCache.getCurrentData(globalConfigNodePath).getData());
}
@Override
@@ -299,7 +316,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData);
InputAdapter.setGlobalConfigs(globalConfigs);
- ChildData childData = cache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName));
+ ChildData childData = serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName));
return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class);
}
@@ -320,7 +337,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) {
String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey());
String logLevelFilterJson = gson.toJson(e.getValue());
- String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData());
+ String currentLogLevelFilterJson = new String(serverCache.getCurrentData(nodePath).getData());
if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName);
@@ -331,7 +348,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
@Override
public LogLevelFilterMap getLogLevelFilters(String clusterName) {
String parentPath = String.format("/%s/loglevelfilter", clusterName);
- Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath);
+ Map<String, ChildData> logLevelFilterNodes = serverCache.getCurrentChildren(parentPath);
TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class);
@@ -387,6 +404,48 @@ public class LogSearchConfigZK implements LogSearchConfig {
}
@Override
+ public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {
+ String nodePath = String.format("/output/solr/%s", type);
+ String data = gson.toJson(outputSolrProperties);
+ if (outputCache.getCurrentData(nodePath) == null) {
+ client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, data.getBytes());
+ } else {
+ client.setData().forPath(nodePath, data.getBytes());
+ }
+ }
+
+ @Override
+ public OutputSolrProperties getOutputSolrProperties(String type) throws Exception {
+ String nodePath = String.format("/output/solr/%s", type);
+ ChildData currentData = outputCache.getCurrentData(nodePath);
+ return currentData == null ?
+ null :
+ gson.fromJson(new String(currentData.getData()), OutputSolrPropertiesImpl.class);
+ }
+
+ @Override
+ public void monitorOutputProperties(final List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {
+ TreeCacheListener listener = new TreeCacheListener() {
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (event.getType() != Type.NODE_UPDATED) {
+ return;
+ }
+
+ LOG.info("Output config updated: " + event.getData().getPath());
+ for (OutputConfigMonitor monitor : outputConfigMonitors) {
+ String monitorPath = String.format("/output/%s/%s", monitor.getDestination(), monitor.getOutputType());
+ if (monitorPath.equals(event.getData().getPath())) {
+ String nodeData = new String(event.getData().getData());
+ OutputSolrProperties outputSolrProperties = gson.fromJson(nodeData, OutputSolrPropertiesImpl.class);
+ monitor.outputConfigChanged(outputSolrProperties);
+ }
+ }
+ }
+ };
+ outputCache.getListenable().addListener(listener);
+ }
+
+ @Override
public void close() {
LOG.info("Closing ZooKeeper Connection");
client.close();
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
new file mode 100644
index 0000000..4b9f54c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+
+import com.google.gson.annotations.SerializedName;
+
+public class OutputSolrPropertiesImpl implements OutputSolrProperties {
+ private final String collection;
+
+ @SerializedName("split_interval_mins")
+ private final String splitIntervalMins;
+
+ public OutputSolrPropertiesImpl(String collection, String splitIntervalMins) {
+ this.collection = collection;
+ this.splitIntervalMins = splitIntervalMins;
+ }
+
+ @Override
+ public String getCollection() {
+ return collection;
+ }
+
+ @Override
+ public String getSplitIntervalMins() {
+ return splitIntervalMins;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 59c2a22..ba3412b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -53,7 +53,7 @@ public class LogFeeder {
private final LogFeederCommandLine cli;
- private ConfigHandler configHandler = new ConfigHandler();
+ private ConfigHandler configHandler;
private LogSearchConfig config;
private MetricsManager metricsManager = new MetricsManager();
@@ -78,11 +78,12 @@ public class LogFeeder {
private void init() throws Throwable {
long startTime = System.currentTimeMillis();
- configHandler.init();
SSLUtil.ensureStorePasswords();
config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()),
LogFeederUtil.getClusterName(), LogSearchConfigZK.class);
+ configHandler = new ConfigHandler(config);
+ configHandler.init();
LogLevelFilterHandler.init(config);
InputConfigUploader.load(config);
config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName());
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 5bf074c..30b61a1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -48,6 +48,7 @@ import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
@@ -85,6 +86,8 @@ public class ConfigHandler implements InputConfigMonitor {
)
private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number";
+ private final LogSearchConfig logSearchConfig;
+
private final OutputManager outputManager = new OutputManager();
private final InputManager inputManager = new InputManager();
@@ -97,7 +100,9 @@ public class ConfigHandler implements InputConfigMonitor {
private boolean simulateMode = false;
- public ConfigHandler() {}
+ public ConfigHandler(LogSearchConfig logSearchConfig) {
+ this.logSearchConfig = logSearchConfig;
+ }
public void init() throws Exception {
loadConfigFiles();
@@ -106,6 +111,8 @@ public class ConfigHandler implements InputConfigMonitor {
inputManager.init();
outputManager.init();
+
+ logSearchConfig.monitorOutputProperties(outputManager.getOutputsToMonitor());
}
private void loadConfigFiles() throws Exception {
@@ -271,6 +278,7 @@ public class ConfigHandler implements InputConfigMonitor {
}
output.setDestination(value);
output.loadConfig(map);
+ output.setLogSearchConfig(logSearchConfig);
// We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
if (output.isEnabled()) {
@@ -387,6 +395,7 @@ public class ConfigHandler implements InputConfigMonitor {
// In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
for (Output output : InputSimulate.getSimulateOutputs()) {
+ output.setLogSearchConfig(logSearchConfig);
outputManager.add(output);
usedOutputSet.add(output);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index 5356159..ec29f69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -73,7 +73,7 @@ public class LogEntryParseTester {
public Map<String, Object> parse() throws Exception {
InputConfig inputConfig = getInputConfig();
- ConfigHandler configHandler = new ConfigHandler();
+ ConfigHandler configHandler = new ConfigHandler(null);
Input input = configHandler.getTestInput(inputConfig, logId);
final Map<String, Object> result = new HashMap<>();
input.getFirstFilter().init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index 09fc3f5..10642d1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -84,7 +84,7 @@ public class InputConfigUploader extends Thread {
String serviceName = m.group(1);
String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
- if (!config.inputConfigExists(LogFeederUtil.getClusterName(), serviceName)) {
+ if (!config.inputConfigExistsLogFeeder(serviceName)) {
config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig);
}
filesHandled.add(inputConfigFile.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index f1002ae..7c487ba 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -163,6 +163,7 @@ public class InputSimulate extends Input {
Class<? extends Output> clazz = output.getClass();
Output outputCopy = clazz.newInstance();
outputCopy.loadConfig(output.getConfigs());
+ outputCopy.setDestination(output.getDestination());
simulateOutputs.add(outputCopy);
super.addOutput(outputCopy);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index 65b9e19..b370e58 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -28,8 +28,11 @@ import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
-public abstract class Output extends ConfigBlock {
+public abstract class Output extends ConfigBlock implements OutputConfigMonitor {
private String destination = null;
protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
@@ -37,6 +40,20 @@ public abstract class Output extends ConfigBlock {
return null;
}
+ public boolean monitorConfigChanges() {
+ return false;
+ };
+
+ @Override
+ public String getOutputType() {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ };
+
@Override
public String getShortDescription() {
return null;
@@ -50,14 +67,11 @@ public abstract class Output extends ConfigBlock {
return super.getNameForThread();
}
- public abstract void write(String block, InputMarker inputMarker)
- throws Exception;
+ public abstract void write(String block, InputMarker inputMarker) throws Exception;
- public abstract void copyFile(File inputFile, InputMarker inputMarker)
- throws UnsupportedOperationException;
+ public abstract void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException;
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
- throws Exception {
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
}
@@ -90,6 +104,12 @@ public abstract class Output extends ConfigBlock {
this.destination = destination;
}
+ protected LogSearchConfig logSearchConfig;
+
+ public void setLogSearchConfig(LogSearchConfig logSearchConfig) {
+ this.logSearchConfig = logSearchConfig;
+ }
+
@Override
public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
@@ -99,7 +119,6 @@ public abstract class Output extends ConfigBlock {
@Override
public synchronized void logStat() {
super.logStat();
-
logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
}
@@ -115,5 +134,4 @@ public abstract class Output extends ConfigBlock {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 4d6c43b..48716fa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -33,6 +33,7 @@ import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.MurmurHash;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -56,6 +57,16 @@ public class OutputManager {
return outputs;
}
+ public List<? extends OutputConfigMonitor> getOutputsToMonitor() {
+ List<Output> outputsToMonitor = new ArrayList<>();
+ for (Output output : outputs) {
+ if (output.monitorConfigChanges()) {
+ outputsToMonitor.add(output);
+ }
+ }
+ return outputsToMonitor;
+ }
+
public void add(Output output) {
this.outputs.add(output);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 162a7f8..596e022 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -25,9 +25,11 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -37,6 +39,8 @@ import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -44,18 +48,23 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
-public class OutputSolr extends Output {
+public class OutputSolr extends Output implements CollectionStateWatcher {
+ private static final Logger LOG = Logger.getLogger(OutputSolr.class);
+
+ private static final int OUTPUT_PROPERTIES_WAIT_MS = 10000;
+ private static final int SHARDS_WAIT_MS = 10000;
+
private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab";
@LogSearchPropertyDescription(
name = "logfeeder.solr.jaas.file",
@@ -66,8 +75,6 @@ public class OutputSolr extends Output {
)
private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file";
- private static final Logger LOG = Logger.getLogger(OutputSolr.class);
-
private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false;
@LogSearchPropertyDescription(
name = "logfeeder.solr.kerberos.enable",
@@ -80,17 +87,17 @@ public class OutputSolr extends Output {
private static final int DEFAULT_MAX_BUFFER_SIZE = 5000;
private static final int DEFAULT_MAX_INTERVAL_MS = 3000;
- private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
- private static final int DEFAULT_SPLIT_INTERVAL = 30;
private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
private static final boolean DEFAULT_SKIP_LOGTIME = false;
private static final int RETRY_INTERVAL = 30;
+ private String type;
private String collection;
private String splitMode;
private int splitInterval;
- private int numberOfShards;
+ private List<String> shards;
+ private String zkConnectString;
private int maxIntervalMS;
private int workers;
private int maxBufferSize;
@@ -98,10 +105,22 @@ public class OutputSolr extends Output {
private int lastSlotByMin = -1;
private boolean skipLogtime = false;
+ private final Object propertiesLock = new Object();
+
private BlockingQueue<OutputData> outgoingBuffer = null;
private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
@Override
+ public boolean monitorConfigChanges() {
+ return true;
+ };
+
+ @Override
+ public String getOutputType() {
+ return type;
+ }
+
+ @Override
protected String getStatMetricName() {
return "output.solr.write_logs";
}
@@ -110,24 +129,34 @@ public class OutputSolr extends Output {
protected String getWriteBytesMetricName() {
return "output.solr.write_bytes";
}
-
+
@Override
public void init() throws Exception {
super.init();
initParams();
setupSecurity();
createOutgoingBuffer();
+ createSolrStateWatcher();
createSolrWorkers();
}
private void initParams() throws Exception {
- splitMode = getStringValue("splits_interval_mins", "none");
- if (!splitMode.equalsIgnoreCase("none")) {
- splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
+ type = getStringValue("type");
+ while (true) {
+ OutputSolrProperties outputSolrProperties = logSearchConfig.getOutputSolrProperties(type);
+ if (outputSolrProperties == null) {
+ LOG.info("Output solr properties for type " + type + " is not available yet.");
+ try { Thread.sleep(OUTPUT_PROPERTIES_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
+ } else {
+ initPropertiesFromLogSearchConfig(outputSolrProperties, true);
+ break;
+ }
+ }
+
+ zkConnectString = getStringValue("zk_connect_string");
+ if (StringUtils.isEmpty(zkConnectString)) {
+ throw new Exception("For solr output the zk_connect_string property need to be set");
}
- isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
-
- numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME);
@@ -140,22 +169,39 @@ public class OutputSolr extends Output {
maxBufferSize = 1;
}
- collection = getStringValue("collection");
- if (StringUtils.isEmpty(collection)) {
- throw new Exception("Collection property is mandatory");
- }
+ LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d."
+ + getShortDescription(), workers, splitMode, splitInterval));
+ }
- LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. "
- + getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ initPropertiesFromLogSearchConfig((OutputSolrProperties)outputProperties, false);
}
+ private void initPropertiesFromLogSearchConfig(OutputSolrProperties outputSolrProperties, boolean init) {
+ synchronized (propertiesLock) {
+ splitMode = outputSolrProperties.getSplitIntervalMins();
+ if (!splitMode.equalsIgnoreCase("none")) {
+ splitInterval = Integer.parseInt(splitMode);
+ }
+ isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+
+ // collection can not be overwritten after initialization
+ if (init) {
+ collection = outputSolrProperties.getCollection();
+ if (StringUtils.isEmpty(collection)) {
+ throw new IllegalStateException("Collection property is mandatory");
+ }
+ }
+ }
+ }
private void setupSecurity() {
String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE);
boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE);
if (securityEnabled) {
System.setProperty("java.security.auth.login.config", jaasFile);
- HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
+ HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
LOG.info("setupSecurity() called for kerberos configuration, jaas file: " + jaasFile);
}
}
@@ -166,81 +212,70 @@ public class OutputSolr extends Output {
outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize);
}
- private void createSolrWorkers() throws Exception, MalformedURLException {
- String solrUrl = getStringValue("url");
- String zkConnectString = getStringValue("zk_connect_string");
- if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkConnectString)) {
- throw new Exception("For solr output, either url or zk_connect_string property need to be set");
+ private void createSolrStateWatcher() throws Exception {
+ if ("none".equals(splitMode)) {
+ return;
+ }
+
+ CloudSolrClient stateWatcherClient = createSolrClient();
+ stateWatcherClient.registerCollectionStateWatcher(collection, this);
+ while (true) {
+ if (shards == null) {
+ LOG.info("Shards are not available yet, waiting ...");
+ try { Thread.sleep(SHARDS_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ synchronized (propertiesLock) {
+ shards = new ArrayList<>(collectionState.getSlicesMap().keySet());
+ Collections.sort(shards);
}
+ return false;
+ }
+ private void createSolrWorkers() throws Exception, MalformedURLException {
for (int count = 0; count < workers; count++) {
- SolrClient solrClient = getSolrClient(solrUrl, zkConnectString, count);
+ CloudSolrClient solrClient = getSolrClient(count);
createSolrWorkerThread(count, solrClient);
}
}
- SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException {
- SolrClient solrClient = createSolrClient(solrUrl, zkConnectString);
- pingSolr(solrUrl, zkConnectString, count, solrClient);
-
- return solrClient;
- }
+ CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
+ CloudSolrClient solrClient = createSolrClient();
+ pingSolr(count, solrClient);
- private SolrClient createSolrClient(String solrUrl, String zkConnectString) throws Exception, MalformedURLException {
- SolrClient solrClient;
- if (zkConnectString != null) {
- solrClient = createCloudSolrClient(zkConnectString);
- } else {
- solrClient = createHttpSolarClient(solrUrl);
- }
return solrClient;
}
- private SolrClient createCloudSolrClient(String zkConnectString) throws Exception {
+ private CloudSolrClient createSolrClient() throws Exception {
LOG.info("Using zookeepr. zkConnectString=" + zkConnectString);
- collection = getStringValue("collection");
- if (StringUtils.isEmpty(collection)) {
- throw new Exception("For solr cloud property collection is mandatory");
- }
LOG.info("Using collection=" + collection);
- CloudSolrClient solrClient = new CloudSolrClient(zkConnectString);
+ CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build();
solrClient.setDefaultCollection(collection);
return solrClient;
}
- private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
- String[] solrUrls = StringUtils.split(solrUrl, ",");
- if (solrUrls.length == 1) {
- LOG.info("Using SolrURL=" + solrUrl);
- return new HttpSolrClient(solrUrl + "/" + collection);
- } else {
- LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
- LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection);
- LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection);
- for (int i = 1; i < solrUrls.length; i++) {
- LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection);
- lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection);
- }
- return lbSolrClient;
- }
- }
-
- private void pingSolr(String solrUrl, String zkConnectString, int count, SolrClient solrClient) {
+ private void pingSolr(int count, CloudSolrClient solrClient) {
try {
- LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString + ", urls=" + solrUrl);
+ LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString);
SolrPingResponse response = solrClient.ping();
if (response.getStatus() == 0) {
LOG.info("Ping to Solr server is successful for worker=" + count);
} else {
LOG.warn(
- String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " +
- "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response));
+ String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " +
+ "response=%s", count, zkConnectString, collection, response));
}
} catch (Throwable t) {
LOG.warn(String.format(
- "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkConnectString=%s, collection=%s",
- count, solrUrl, zkConnectString, collection), t);
+ "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count,
+ zkConnectString, collection), t);
}
}
@@ -402,9 +437,11 @@ public class OutputSolr extends Output {
boolean result = false;
while (!isDrain()) {
try {
- if (isComputeCurrentCollection) {
- // Compute the current router value
- addRouterField();
+ synchronized (propertiesLock) {
+ if (isComputeCurrentCollection) {
+ // Compute the current router value
+ addRouterField();
+ }
}
addToSolr(outputData);
resetLocalBuffer();
@@ -468,9 +505,9 @@ public class OutputSolr extends Output {
int currMin = cal.get(Calendar.MINUTE);
int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin;
- int slotByMin = minOfWeek / splitInterval % numberOfShards;
+ int slotByMin = minOfWeek / splitInterval % shards.size();
- String shard = "shard" + slotByMin;
+ String shard = shards.get(slotByMin);
if (lastSlotByMin != slotByMin) {
LOG.info("Switching to shard " + shard + ", output=" + getShortDescription());
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index 8985110..ce040f9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -28,8 +28,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
@@ -48,6 +50,7 @@ public class OutputSolrTest {
private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
private OutputSolr outputSolr;
+ private LogSearchConfig logSearchConfigMock;
private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
@Rule
@@ -56,8 +59,9 @@ public class OutputSolrTest {
@Before
public void init() throws Exception {
outputSolr = new OutputSolr() {
+ @SuppressWarnings("deprecation")
@Override
- SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException {
+ CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
return new CloudSolrClient(null) {
private static final long serialVersionUID = 1L;
@@ -74,6 +78,13 @@ public class OutputSolrTest {
};
}
};
+
+ OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl("hadoop_logs", "none");
+ logSearchConfigMock = EasyMock.createNiceMock(LogSearchConfig.class);
+ EasyMock.expect(logSearchConfigMock.getOutputSolrProperties("service")).andReturn(outputSolrProperties);
+ EasyMock.replay(logSearchConfigMock);
+
+ outputSolr.setLogSearchConfig(logSearchConfigMock);
}
@Test
@@ -81,9 +92,9 @@ public class OutputSolrTest {
LOG.info("testOutputToSolr_uploadData()");
Map<String, Object> config = new HashMap<String, Object>();
- config.put("url", "some url");
+ config.put("zk_connect_string", "some zk_connect_string");
config.put("workers", "3");
- config.put("collection", "some collection");
+ config.put("type", "service");
outputSolr.loadConfig(config);
outputSolr.init();
@@ -138,22 +149,21 @@ public class OutputSolrTest {
assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
- assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue,
- expectedValue);
+ assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, expectedValue);
}
}
}
@Test
- public void testOutputToSolr_noUrlOrZkConnectString() throws Exception {
+ public void testOutputToSolr_noZkConnectString() throws Exception {
LOG.info("testOutputToSolr_noUrlOrZkConnectString()");
expectedException.expect(Exception.class);
- expectedException.expectMessage("For solr output, either url or zk_connect_string property need to be set");
+ expectedException.expectMessage("For solr output the zk_connect_string property need to be set");
Map<String, Object> config = new HashMap<String, Object>();
config.put("workers", "3");
- config.put("collection", "some collection");
+ config.put("type", "service");
outputSolr.loadConfig(config);
outputSolr.init();
@@ -162,5 +172,6 @@ public class OutputSolrTest {
@After
public void cleanUp() {
receivedDocs.clear();
+ EasyMock.verify(logSearchConfigMock);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
index c569a27..4a44e60 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
@@ -241,4 +241,9 @@ public class SolrAuditLogPropsConfig implements SolrPropsConfig {
public void setAliasNameIn(String aliasNameIn) {
this.aliasNameIn = aliasNameIn;
}
+
+ @Override
+ public String getLogType() {
+ return "audit";
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
index 975e6a7..822cea4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
@@ -145,4 +145,9 @@ public class SolrEventHistoryPropsConfig extends SolrConnectionPropsConfig {
void setPopulateIntervalMins(Integer populateIntervalMins) {
this.populateIntervalMins = populateIntervalMins;
}
+
+ @Override
+ public String getLogType() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
index ceddf7e..cd0a1c2 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
@@ -58,4 +58,6 @@ public interface SolrPropsConfig {
String getConfigSetFolder();
void setConfigSetFolder(String configSetFolder);
+
+ String getLogType();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
index e5039d5..6a0e6b1 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
@@ -126,4 +126,9 @@ public class SolrServiceLogPropsConfig extends SolrConnectionPropsConfig {
public void setReplicationFactor(Integer replicationFactor) {
this.replicationFactor = replicationFactor;
}
+
+ @Override
+ public String getLogType() {
+ return "service";
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
index c34dce6..3f6df75 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logsearch.configurer;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Named;
@@ -45,6 +46,8 @@ public class LogSearchConfigConfigurer implements Configurer {
@Inject
private LogSearchConfigState logSearchConfigState;
+ @PostConstruct
+ @Override
public void start() {
Thread setupThread = new Thread("setup_logsearch_config") {
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
index f2d022e..225f5a3 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
@@ -56,7 +56,7 @@ public class SolrCollectionConfigurer implements Configurer {
private final SolrDaoBase solrDaoBase;
private final boolean hasEnumConfig; // enumConfig.xml for solr collection
- public SolrCollectionConfigurer(final SolrDaoBase solrDaoBase, final boolean hasEnumConfig) {
+ public SolrCollectionConfigurer(SolrDaoBase solrDaoBase, boolean hasEnumConfig) {
this.solrDaoBase = solrDaoBase;
this.hasEnumConfig = hasEnumConfig;
}
@@ -215,7 +215,8 @@ public class SolrCollectionConfigurer implements Configurer {
return status;
}
- private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig, boolean reloadCollectionNeeded) {
+ private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig,
+ boolean reloadCollectionNeeded) {
try {
List<String> allCollectionList = new ListCollectionHandler().handle(solrClient, null);
boolean collectionCreated = new CreateCollectionHandler(allCollectionList).handle(solrClient, solrPropsConfig);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
index 3eea08f..4142176 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
@@ -69,6 +69,7 @@ public class AuditSolrDao extends SolrDaoBase {
String rangerAuditCollection = solrAuditLogPropsConfig.getRangerCollection();
try {
+ waitForLogSearchConfig();
new SolrCollectionConfigurer(this, true).start();
boolean createAlias = (aliasNameIn != null && StringUtils.isNotBlank(rangerAuditCollection));
if (createAlias) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
index 308ef1f..0752ac0 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
@@ -65,6 +65,7 @@ public class ServiceLogsSolrDao extends SolrDaoBase {
public void postConstructor() {
LOG.info("postConstructor() called.");
try {
+ waitForLogSearchConfig();
new SolrCollectionConfigurer(this, true).start();
} catch (Exception e) {
LOG.error("error while connecting to Solr for service logs : solrUrl=" + solrServiceLogPropsConfig.getSolrUrl()
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
index b30b6ef..15f59e4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
@@ -24,6 +24,7 @@ import org.apache.ambari.logsearch.common.LogType;
import org.apache.ambari.logsearch.common.MessageEnums;
import org.apache.ambari.logsearch.conf.SolrKerberosConfig;
import org.apache.ambari.logsearch.conf.SolrPropsConfig;
+import org.apache.ambari.logsearch.conf.global.LogSearchConfigState;
import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
import org.apache.ambari.logsearch.util.RESTErrorUtil;
import org.apache.ambari.logsearch.util.SolrUtil;
@@ -53,11 +54,21 @@ public abstract class SolrDaoBase {
@Inject
private SolrKerberosConfig solrKerberosConfig;
-
+
+ @Inject
+ protected LogSearchConfigState logSearchConfigState;
+
protected SolrDaoBase(LogType logType) {
this.logType = logType;
}
+ protected void waitForLogSearchConfig() {
+ while (!logSearchConfigState.isLogSearchConfigAvailable()) {
+ LOG.info("Log Search config not available yet, waiting...");
+ try { Thread.sleep(1000); } catch (Exception e) { LOG.warn("Exception during waiting for Log Search Config", e); }
+ }
+ }
+
public QueryResponse process(SolrQuery solrQuery, String event) {
SolrUtil.removeDoubleOrTripleEscapeFromFilters(solrQuery);
LOG.info("Solr query will be processed: " + solrQuery);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
index 752a1e1..b6e9def 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
@@ -19,6 +19,9 @@
package org.apache.ambari.logsearch.handler;
import org.apache.ambari.logsearch.conf.SolrPropsConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
+import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -50,7 +53,7 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> {
private static final String MODIFY_COLLECTION_QUERY = "/admin/collections?action=MODIFYCOLLECTION&collection=%s&%s=%d";
private static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
- private List<String> allCollectionList;
+ private final List<String> allCollectionList;
public CreateCollectionHandler(List<String> allCollectionList) {
this.allCollectionList = allCollectionList;
@@ -58,12 +61,19 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> {
@Override
public Boolean handle(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) throws Exception {
+ if (solrPropsConfig.getLogType() != null) {
+ OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl(solrPropsConfig.getCollection(),
+ solrPropsConfig.getSplitInterval());
+ LogSearchConfigConfigurer.getConfig().saveOutputSolrProperties(solrPropsConfig.getLogType(), outputSolrProperties);
+ }
+
boolean result;
if (solrPropsConfig.getSplitInterval().equalsIgnoreCase("none")) {
result = createCollection(solrClient, solrPropsConfig, this.allCollectionList);
} else {
result = setupCollectionsWithImplicitRouting(solrClient, solrPropsConfig, this.allCollectionList);
}
+
return result;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
index 2c143c0..a1181b4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
@@ -33,7 +33,6 @@ import org.apache.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
-import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Named;
import javax.validation.ConstraintViolation;
@@ -49,11 +48,6 @@ public class ShipperConfigManager extends JsonManagerBase {
@Inject
private LogSearchConfigConfigurer logSearchConfigConfigurer;
-
- @PostConstruct
- private void postConstructor() {
- logSearchConfigConfigurer.start();
- }
public List<String> getServices(String clusterName) {
return LogSearchConfigConfigurer.getConfig().getServices(clusterName);
@@ -66,7 +60,7 @@ public class ShipperConfigManager extends JsonManagerBase {
public Response createInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) {
try {
- if (LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) {
+ if (LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(ImmutableMap.of("errorMessage", "Input config already exists for service " + serviceName))
@@ -83,7 +77,7 @@ public class ShipperConfigManager extends JsonManagerBase {
public Response setInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) {
try {
- if (!LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) {
+ if (!LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(ImmutableMap.of("errorMessage", "Input config doesn't exist for service " + serviceName))
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
index 55fd36c..f41e981 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
@@ -5,9 +5,7 @@
"comment": "Output to solr for service logs",
"destination": "solr",
"zk_connect_string": "localhost:9983",
- "collection": "hadoop_logs",
- "number_of_shards": "3",
- "splits_interval_mins": "2",
+ "type": "service",
"skip_logtime": "true",
"conditions": {
"fields": {
@@ -22,9 +20,7 @@
"is_enabled": "true",
"destination": "solr",
"zk_connect_string": "localhost:9983",
- "collection": "audit_logs",
- "number_of_shards": "3",
- "splits_interval_mins": "2",
+ "type": "audit",
"skip_logtime": "true",
"conditions": {
"fields": {
@@ -35,4 +31,4 @@
}
}
]
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
index b4502d6..6caa770 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
@@ -352,41 +352,59 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
updateConfigurationPropertiesForCluster(cluster, "logfeeder-properties", newProperties, true, true);
}
- Config logfeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j");
- if (logfeederLog4jProperties != null) {
- String content = logfeederLog4jProperties.getProperties().get("content");
+ Config logFeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j");
+ if (logFeederLog4jProperties != null) {
+ String content = logFeederLog4jProperties.getProperties().get("content");
if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) {
content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
updateConfigurationPropertiesForCluster(cluster, "logfeeder-log4j", Collections.singletonMap("content", content), true, true);
}
}
- Config logsearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j");
- if (logsearchLog4jProperties != null) {
- String content = logsearchLog4jProperties.getProperties().get("content");
+ Config logSearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j");
+ if (logSearchLog4jProperties != null) {
+ String content = logSearchLog4jProperties.getProperties().get("content");
if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) {
content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
updateConfigurationPropertiesForCluster(cluster, "logsearch-log4j", Collections.singletonMap("content", content), true, true);
}
}
- Config logsearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig");
- if (logsearchServiceLogsConfig != null) {
- String content = logsearchServiceLogsConfig.getProperties().get("content");
+ Config logSearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig");
+ if (logSearchServiceLogsConfig != null) {
+ String content = logSearchServiceLogsConfig.getProperties().get("content");
if (content.contains("class=\"solr.admin.AdminHandlers\"")) {
content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", "");
updateConfigurationPropertiesForCluster(cluster, "logsearch-service_logs-solrconfig", Collections.singletonMap("content", content), true, true);
}
}
- Config logsearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig");
- if (logsearchAuditLogsConfig != null) {
- String content = logsearchAuditLogsConfig.getProperties().get("content");
+ Config logSearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig");
+ if (logSearchAuditLogsConfig != null) {
+ String content = logSearchAuditLogsConfig.getProperties().get("content");
if (content.contains("class=\"solr.admin.AdminHandlers\"")) {
content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", "");
updateConfigurationPropertiesForCluster(cluster, "logsearch-audit_logs-solrconfig", Collections.singletonMap("content", content), true, true);
}
}
+
+ Config logFeederOutputConfig = cluster.getDesiredConfigByType("logfeeder-output-config");
+ if (logFeederOutputConfig != null) {
+ String content = logFeederOutputConfig.getProperties().get("content");
+ content = content.replace(
+ " \"collection\":\"{{logsearch_solr_collection_service_logs}}\",\n" +
+ " \"number_of_shards\": \"{{logsearch_collection_service_logs_numshards}}\",\n" +
+ " \"splits_interval_mins\": \"{{logsearch_service_logs_split_interval_mins}}\",\n",
+ " \"type\": \"service\",\n");
+
+ content = content.replace(
+ " \"collection\":\"{{logsearch_solr_collection_audit_logs}}\",\n" +
+ " \"number_of_shards\": \"{{logsearch_collection_audit_logs_numshards}}\",\n" +
+ " \"splits_interval_mins\": \"{{logsearch_audit_logs_split_interval_mins}}\",\n",
+ " \"type\": \"audit\",\n");
+
+ updateConfigurationPropertiesForCluster(cluster, "logsearch-output-config", Collections.singletonMap("content", content), true, true);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
index 214e5ba..0c599c9 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
@@ -22,9 +22,7 @@
"is_enabled":"{{solr_service_logs_enable}}",
"destination":"solr",
"zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}",
- "collection":"{{logsearch_solr_collection_service_logs}}",
- "number_of_shards": "{{logsearch_collection_service_logs_numshards}}",
- "splits_interval_mins": "{{logsearch_service_logs_split_interval_mins}}",
+ "type": "service",
"conditions":{
"fields":{
"rowtype":[
@@ -41,9 +39,7 @@
"is_enabled":"{{solr_audit_logs_enable}}",
"destination":"solr",
"zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}",
- "collection":"{{logsearch_solr_collection_audit_logs}}",
- "number_of_shards": "{{logsearch_collection_audit_logs_numshards}}",
- "splits_interval_mins": "{{logsearch_audit_logs_split_interval_mins}}",
+ "type": "audit",
"conditions":{
"fields":{
"rowtype":[