You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/05/18 14:01:51 UTC
[10/24] ambari git commit: AMBARI-20881 Add Log Level Filter to the
Log Search config API (mgergely)
AMBARI-20881 Add Log Level Filter to the Log Search config API (mgergely)
Change-Id: I8e3d5a628d02407ad2af4ecb77fff3ada10f7707
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b94d3cf
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b94d3cf
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b94d3cf
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 3b94d3cff380bf3557af57c6ecd9005ab46b8916
Parents: f18a822
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed May 17 14:47:06 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Wed May 17 14:47:40 2017 +0200
----------------------------------------------------------------------
.../ambari-logsearch-config-api/pom.xml | 2 +-
.../config/api/InputConfigMonitor.java | 4 +-
.../config/api/LogLevelFilterMonitor.java | 44 ++++
.../logsearch/config/api/LogSearchConfig.java | 57 ++++-
.../model/loglevelfilter/LogLevelFilter.java | 79 +++++++
.../model/loglevelfilter/LogLevelFilterMap.java | 33 +++
.../config/api/LogSearchConfigClass1.java | 21 +-
.../config/api/LogSearchConfigClass2.java | 21 +-
.../ambari-logsearch-config-zookeeper/pom.xml | 4 +
.../config/zookeeper/LogSearchConfigZK.java | 191 ++++++++++++-----
.../org/apache/ambari/logfeeder/LogFeeder.java | 6 +-
.../logfeeder/input/InputConfigUploader.java | 2 +-
.../logfeeder/logconfig/FilterLogData.java | 87 --------
.../logfeeder/logconfig/LogConfigFetcher.java | 168 ---------------
.../logfeeder/logconfig/LogConfigHandler.java | 213 -------------------
.../logfeeder/logconfig/LogFeederFilter.java | 90 --------
.../logconfig/LogFeederFilterWrapper.java | 55 -----
.../logfeeder/loglevelfilter/FilterLogData.java | 73 +++++++
.../loglevelfilter/LogLevelFilterHandler.java | 157 ++++++++++++++
.../ambari/logfeeder/output/OutputManager.java | 2 +-
.../ambari/logfeeder/util/LogFeederUtil.java | 19 --
.../logconfig/LogConfigHandlerTest.java | 90 ++++----
.../src/test/resources/logfeeder.properties | 3 +-
.../configurer/LogfeederFilterConfigurer.java | 66 ------
.../ambari/logsearch/dao/UserConfigSolrDao.java | 79 -------
.../ambari/logsearch/doc/DocConstants.java | 10 +-
.../logsearch/manager/ShipperConfigManager.java | 45 +++-
.../logsearch/manager/UserConfigManager.java | 24 ---
.../model/common/LSServerLogLevelFilter.java | 100 +++++++++
.../model/common/LSServerLogLevelFilterMap.java | 65 ++++++
.../model/common/LogFeederDataMap.java | 50 -----
.../model/common/LogfeederFilterData.java | 87 --------
.../logsearch/rest/ShipperConfigResource.java | 43 +++-
.../logsearch/rest/UserConfigResource.java | 18 --
.../webapp/templates/common/Header_tmpl.html | 5 +-
.../server/upgrade/UpgradeCatalog300.java | 15 ++
.../configuration/logfeeder-properties.xml | 10 +
.../configuration/logsearch-properties.xml | 10 -
.../LOGSEARCH/0.5.0/themes/theme.json | 4 +-
.../server/upgrade/UpgradeCatalog300Test.java | 29 +++
40 files changed, 982 insertions(+), 1099 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
index e9abed0..72fcc80 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
@@ -33,7 +33,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
-
+
<dependencies>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
index df26920..29a82a6 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
@@ -20,7 +20,7 @@
package org.apache.ambari.logsearch.config.api;
/**
- * Monitors input configuration changes.
+ * Monitors input configuration changes.
*/
public interface InputConfigMonitor {
/**
@@ -31,7 +31,7 @@ public interface InputConfigMonitor {
* @throws Exception
*/
void loadInputConfigs(String serviceName, String inputConfig) throws Exception;
-
+
/**
* Notification of the removal of an input configuration.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
new file mode 100644
index 0000000..766f751
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogLevelFilterMonitor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Monitors log level filter changes.
+ */
+package org.apache.ambari.logsearch.config.api;
+
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+
+public interface LogLevelFilterMonitor {
+
+ /**
+ * Notification of a new or updated log level filter.
+ *
+ * @param logId The log for which the log level filter was created/updated.
+ * @param logLevelFilter The log level filter to apply from now on to the log.
+ */
+ void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter);
+
+ /**
+ * Notification of the removal of a log level filter.
+ *
+ * @param logId The log of which's log level filter was removed.
+ */
+ void removeLogLevelFilter(String logId);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 0bb0b78..07921d0 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -23,6 +23,9 @@ import java.io.Closeable;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+
/**
* Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes.
*/
@@ -33,7 +36,7 @@ public interface LogSearchConfig extends Closeable {
public enum Component {
SERVER, LOGFEEDER;
}
-
+
/**
* Initialization of the configuration.
*
@@ -42,7 +45,7 @@ public interface LogSearchConfig extends Closeable {
* @throws Exception
*/
void init(Component component, Map<String, String> properties) throws Exception;
-
+
/**
* Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode.
*
@@ -50,7 +53,7 @@ public interface LogSearchConfig extends Closeable {
* @return List of the service names.
*/
List<String> getServices(String clusterName);
-
+
/**
* Checks if input configuration exists.
*
@@ -60,7 +63,7 @@ public interface LogSearchConfig extends Closeable {
* @throws Exception
*/
boolean inputConfigExists(String clusterName, String serviceName) throws Exception;
-
+
/**
* Returns the input configuration of a service in a cluster. Will be used only in SERVER mode.
*
@@ -69,7 +72,7 @@ public interface LogSearchConfig extends Closeable {
* @return The input configuration for the service if it exists, null otherwise.
*/
String getInputConfig(String clusterName, String serviceName);
-
+
/**
* Uploads the input configuration for a service in a cluster.
*
@@ -78,13 +81,51 @@ public interface LogSearchConfig extends Closeable {
* @param inputConfig The input configuration of the service.
* @throws Exception
*/
+ void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception;
+
+ /**
+ * Modifies the input configuration for a service in a cluster.
+ *
+ * @param clusterName The name of the cluster where the service is.
+ * @param serviceName The name of the service of which's input configuration is uploaded.
+ * @param inputConfig The input configuration of the service.
+ * @throws Exception
+ */
void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception;
-
+
+ /**
+ * Uploads the log level filter of a log.
+ *
+ * @param clusterName The name of the cluster where the log is.
+ * @param logId The id of the log.
+ * @param filter The log level filter for the log.
+ * @throws Exception
+ */
+ void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception;
+
+ /**
+ * Modifies the log level filters for all the logs.
+ *
+ * @param clusterName The name of the cluster where the logs are.
+ * @param filters The log level filters to set.
+ * @throws Exception
+ */
+ void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception;
+
+ /**
+ * Returns the Log Level Filters of a cluster.
+ *
+ * @param clusterName The name of the cluster which's log level filters are required.
+ * @return All the log level filters of the cluster.
+ */
+ LogLevelFilterMap getLogLevelFilters(String clusterName);
+
/**
* Starts the monitoring of the input configurations, asynchronously. Will be used only in LOGFEEDER mode.
*
- * @param configMonitor The input config monitor to call in case of a config change.
+ * @param inputConfigMonitor The input config monitor to call in case of an input config change.
+ * @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change.
* @throws Exception
*/
- void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception;
+ void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
new file mode 100644
index 0000000..06cf589
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.loglevelfilter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class LogLevelFilter {
+
+ private String label;
+ private List<String> hosts;
+ private List<String> defaultLevels;
+ private List<String> overrideLevels;
+ private Date expiryTime;
+
+ public LogLevelFilter() {
+ hosts = new ArrayList<String>();
+ defaultLevels = new ArrayList<String>();
+ overrideLevels = new ArrayList<String>();
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ }
+
+ public List<String> getDefaultLevels() {
+ return defaultLevels;
+ }
+
+ public void setDefaultLevels(List<String> defaultLevels) {
+ this.defaultLevels = defaultLevels;
+ }
+
+ public List<String> getOverrideLevels() {
+ return overrideLevels;
+ }
+
+ public void setOverrideLevels(List<String> overrideLevels) {
+ this.overrideLevels = overrideLevels;
+ }
+
+ public Date getExpiryTime() {
+ return expiryTime;
+ }
+
+ public void setExpiryTime(Date expiryTime) {
+ this.expiryTime = expiryTime;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
new file mode 100644
index 0000000..37fdb9f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/loglevelfilter/LogLevelFilterMap.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.loglevelfilter;
+
+import java.util.TreeMap;
+
+public class LogLevelFilterMap {
+ private TreeMap<String, LogLevelFilter> filter;
+
+ public TreeMap<String, LogLevelFilter> getFilter() {
+ return filter;
+ }
+
+ public void setFilter(TreeMap<String, LogLevelFilter> filter) {
+ this.filter = filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index 969eb30..fc3fe5b 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
public class LogSearchConfigClass1 implements LogSearchConfig {
@Override
@@ -35,10 +37,14 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
}
@Override
+ public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+ @Override
public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
@Override
- public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+ public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor)
+ throws Exception {}
@Override
public List<String> getServices(String clusterName) {
@@ -49,7 +55,18 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
public String getInputConfig(String clusterName, String serviceName) {
return null;
}
-
+
+ @Override
+ public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {}
+
+ @Override
+ public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {}
+
+ @Override
+ public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+ return null;
+ }
+
@Override
public void close() {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index 664ecc9..346edb3 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
public class LogSearchConfigClass2 implements LogSearchConfig {
@Override
@@ -35,10 +37,14 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
}
@Override
+ public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+ @Override
public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
@Override
- public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+ public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor)
+ throws Exception {}
@Override
public List<String> getServices(String clusterName) {
@@ -49,7 +55,18 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
public String getInputConfig(String clusterName, String serviceName) {
return null;
}
-
+
+ @Override
+ public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) {}
+
+ @Override
+ public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {}
+
+ @Override
+ public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+ return null;
+ }
+
@Override
public void close() {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
index 4ed8eba..2c59a4a 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
@@ -70,5 +70,9 @@
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 738fde2..5e22374 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -22,9 +22,13 @@ package org.apache.ambari.logsearch.config.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -32,15 +36,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import com.google.common.base.Splitter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
public class LogSearchConfigZK implements LogSearchConfig {
private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class);
@@ -49,6 +57,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
private static final int CONNECTION_TIMEOUT = 30000;
private static final String DEFAULT_ZK_ROOT = "/logsearch";
private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
+ private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
private static final String CLUSTER_NAME_PROPERTY = "cluster.name";
private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
@@ -59,6 +68,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
private String root;
private CuratorFramework client;
private TreeCache cache;
+ private Gson gson;
@Override
public void init(Component component, Map<String, String> properties) throws Exception {
@@ -89,6 +99,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY)));
}
+
+ gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
}
@Override
@@ -98,66 +110,43 @@ public class LogSearchConfigZK implements LogSearchConfig {
}
@Override
- public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
+ public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName);
- client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
- LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName);
- }
-
- private List<ACL> getAcls() {
- String aclStr = properties.get(ZK_ACLS_PROPERTY);
- if (StringUtils.isBlank(aclStr)) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ try {
+ client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
+ LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
+ } catch (NodeExistsException e) {
+ LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
}
-
- List<ACL> acls = new ArrayList<>();
- List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
- for (String unparcedAcl : aclStrList) {
- String[] parts = unparcedAcl.split(":");
- if (parts.length == 3) {
- acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
- }
- }
- return acls;
}
- private Integer parsePermission(String permission) {
- int permissionCode = 0;
- for (char each : permission.toLowerCase().toCharArray()) {
- switch (each) {
- case 'r':
- permissionCode |= ZooDefs.Perms.READ;
- break;
- case 'w':
- permissionCode |= ZooDefs.Perms.WRITE;
- break;
- case 'c':
- permissionCode |= ZooDefs.Perms.CREATE;
- break;
- case 'd':
- permissionCode |= ZooDefs.Perms.DELETE;
- break;
- case 'a':
- permissionCode |= ZooDefs.Perms.ADMIN;
- break;
- default:
- throw new IllegalArgumentException("Unsupported permission: " + permission);
- }
- }
- return permissionCode;
+ @Override
+ public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
+ String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName);
+ client.setData().forPath(nodePath, inputConfig.getBytes());
+ LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName);
}
@Override
- public void monitorInputConfigChanges(final InputConfigMonitor configMonitor) throws Exception {
+ public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor,
+ final LogLevelFilterMonitor logLevelFilterMonitor ) throws Exception {
TreeCacheListener listener = new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- if (!event.getData().getPath().startsWith(String.format("%s/%s/input/", root, properties.get(CLUSTER_NAME_PROPERTY)))) {
- return;
- }
-
String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
String nodeData = new String(event.getData().getData());
- switch (event.getType()) {
+ Type eventType = event.getType();
+
+ String configPathStab = String.format("%s/%s/", root, properties.get(CLUSTER_NAME_PROPERTY));
+
+ if (event.getData().getPath().startsWith(configPathStab + "input/")) {
+ handleInputConfigChange(eventType, nodeName, nodeData);
+ } else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
+ handleLogLevelFilterChange(eventType, nodeName, nodeData);
+ }
+ }
+
+ private void handleInputConfigChange(Type eventType, String nodeName, String nodeData) {
+ switch (eventType) {
case NODE_ADDED:
LOG.info("Node added under input ZK node: " + nodeName);
addInputs(nodeName, nodeData);
@@ -177,16 +166,33 @@ public class LogSearchConfigZK implements LogSearchConfig {
}
private void removeInputs(String serviceName) {
- configMonitor.removeInputs(serviceName);
+ inputConfigMonitor.removeInputs(serviceName);
}
private void addInputs(String serviceName, String inputConfig) {
try {
- configMonitor.loadInputConfigs(serviceName, inputConfig);
+ inputConfigMonitor.loadInputConfigs(serviceName, inputConfig);
} catch (Exception e) {
LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
}
}
+
+ private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) {
+ switch (eventType) {
+ case NODE_ADDED:
+ case NODE_UPDATED:
+ LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
+ LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
+ logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
+ break;
+ case NODE_REMOVED:
+ LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
+ logLevelFilterMonitor.removeLogLevelFilter(nodeName);
+ break;
+ default:
+ break;
+ }
+ }
};
cache.getListenable().addListener(listener);
cache.start();
@@ -206,6 +212,89 @@ public class LogSearchConfigZK implements LogSearchConfig {
}
@Override
+ public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception {
+ String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, logId);
+ String logLevelFilterJson = gson.toJson(filter);
+ try {
+ client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, logLevelFilterJson.getBytes());
+ LOG.info("Uploaded log level filter for the log " + logId + " for cluster " + clusterName);
+ } catch (NodeExistsException e) {
+ LOG.debug("Did not upload log level filters for log " + logId + " as it was already uploaded by another Log Feeder");
+ }
+ }
+
+ @Override
+ public void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) throws Exception {
+ for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) {
+ String nodePath = String.format("%s/%s/loglevelfilter/%s", root, clusterName, e.getKey());
+ String logLevelFilterJson = gson.toJson(e.getValue());
+ String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData());
+ if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
+ client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
+ LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName);
+ }
+ }
+ }
+
+ @Override
+ public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+ String parentPath = String.format("%s/%s/loglevelfilter", root, clusterName);
+ Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath);
+ TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
+ for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
+ LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class);
+ filters.put(e.getKey(), logLevelFilter);
+ }
+
+ LogLevelFilterMap logLevelFilters = new LogLevelFilterMap();
+ logLevelFilters.setFilter(filters);
+ return logLevelFilters;
+ }
+
+ private List<ACL> getAcls() {
+ String aclStr = properties.get(ZK_ACLS_PROPERTY);
+ if (StringUtils.isBlank(aclStr)) {
+ return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ }
+
+ List<ACL> acls = new ArrayList<>();
+ List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+ for (String unparcedAcl : aclStrList) {
+ String[] parts = unparcedAcl.split(":");
+ if (parts.length == 3) {
+ acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+ }
+ }
+ return acls;
+ }
+
+ private Integer parsePermission(String permission) {
+ int permissionCode = 0;
+ for (char each : permission.toLowerCase().toCharArray()) {
+ switch (each) {
+ case 'r':
+ permissionCode |= ZooDefs.Perms.READ;
+ break;
+ case 'w':
+ permissionCode |= ZooDefs.Perms.WRITE;
+ break;
+ case 'c':
+ permissionCode |= ZooDefs.Perms.CREATE;
+ break;
+ case 'd':
+ permissionCode |= ZooDefs.Perms.DELETE;
+ break;
+ case 'a':
+ permissionCode |= ZooDefs.Perms.ADMIN;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported permission: " + permission);
+ }
+ }
+ return permissionCode;
+ }
+
+ @Override
public void close() {
LOG.info("Closing ZooKeeper Connection");
client.close();
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 074fedb..c853f42 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -29,7 +29,7 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
import org.apache.ambari.logfeeder.input.InputManager;
-import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -71,13 +71,13 @@ public class LogFeeder {
long startTime = System.currentTimeMillis();
configHandler.init();
- LogConfigHandler.handleConfig();
SSLUtil.ensureStorePasswords();
config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,
Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class);
+ LogLevelFilterHandler.init(config);
InputConfigUploader.load(config);
- config.monitorInputConfigChanges(configHandler);
+ config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler());
metricsManager.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index b70fbd1..8aec690 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -75,7 +75,7 @@ public class InputConfigUploader extends Thread {
String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
if (!config.inputConfigExists(clusterName, serviceName)) {
- config.setInputConfig(clusterName, serviceName, inputConfig);
+ config.createInputConfig(clusterName, serviceName, inputConfig);
}
filesHandled.add(inputConfigFile.getAbsolutePath());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
deleted file mode 100644
index a05a916..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-
-/**
- * Read configuration from solr and filter the log
- */
-public enum FilterLogData {
- INSTANCE;
-
- private static final Logger LOG = Logger.getLogger(FilterLogData.class);
-
- private static final boolean DEFAULT_VALUE = true;
-
- public boolean isAllowed(String jsonBlock, InputMarker inputMarker) {
- if (StringUtils.isEmpty(jsonBlock)) {
- return DEFAULT_VALUE;
- }
- Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
- return isAllowed(jsonObj, inputMarker);
- }
-
- public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
- if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
- return true;
-
- boolean isAllowed = applyFilter(jsonObj);
- if (!isAllowed) {
- LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
- }
- return isAllowed;
- }
-
-
- private boolean applyFilter(Map<String, Object> jsonObj) {
- if (MapUtils.isEmpty(jsonObj)) {
- LOG.warn("Output jsonobj is empty");
- return DEFAULT_VALUE;
- }
-
- String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
- String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
- String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
- if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) {
- LogFeederFilter componentFilter = LogConfigHandler.findComponentFilter(componentName);
- if (componentFilter == null) {
- return DEFAULT_VALUE;
- }
- List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, componentFilter);
- if (CollectionUtils.isEmpty(allowedLevels)) {
- allowedLevels.add(LogFeederConstants.ALL);
- }
- return LogFeederUtil.isListContains(allowedLevels, level, false);
- }
- else {
- return DEFAULT_VALUE;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
deleted file mode 100644
index 12c744c..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.SolrRequest.METHOD;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-
-public class LogConfigFetcher {
- private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class);
-
- private static LogConfigFetcher instance;
- public synchronized static LogConfigFetcher getInstance() {
- if (instance == null) {
- try {
- instance = new LogConfigFetcher();
- } catch (Exception e) {
- String logMessageKey = LogConfigFetcher.class.getSimpleName() + "_SOLR_UTIL";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error constructing solrUtil", e, LOG, Level.WARN);
- }
- }
- return instance;
- }
-
- private SolrClient solrClient;
-
- private String solrDetail = "";
-
- public LogConfigFetcher() throws Exception {
- String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
- String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
- String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
- connectToSolr(url, zkConnectString, collection);
- }
-
- private SolrClient connectToSolr(String url, String zkConnectString, String collection) throws Exception {
- solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection + ", url=" + url;
-
- LOG.info("connectToSolr() " + solrDetail);
- if (StringUtils.isEmpty(collection)) {
- throw new Exception("For solr, collection name is mandatory. " + solrDetail);
- }
-
- if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url))
- throw new Exception("Both zkConnectString and URL are empty. zkConnectString=" + zkConnectString + ", collection=" +
- collection + ", url=" + url);
-
- if (StringUtils.isNotEmpty(zkConnectString)) {
- solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection;
- LOG.info("Using zookeepr. " + solrDetail);
- CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString);
- solrClouldClient.setDefaultCollection(collection);
- solrClient = solrClouldClient;
- checkSolrStatus(3 * 60 * 1000);
- } else {
- solrDetail = "collection=" + collection + ", url=" + url;
- String collectionURL = url + "/" + collection;
- LOG.info("Connecting to solr : " + collectionURL);
- solrClient = new HttpSolrClient(collectionURL);
- }
- return solrClient;
- }
-
- private boolean checkSolrStatus(int waitDurationMS) {
- boolean status = false;
- try {
- long beginTimeMS = System.currentTimeMillis();
- long waitIntervalMS = 2000;
- int pingCount = 0;
- while (true) {
- pingCount++;
- CollectionAdminResponse response = null;
- try {
- CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
- response = colListReq.process(solrClient);
- } catch (Exception ex) {
- LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
- }
- if (response != null && response.getStatus() == 0) {
- LOG.info("Solr getCollections() is success. solr=" + solrDetail);
- status = true;
- break;
- }
- if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
- LOG.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS)
- + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
- + solrDetail + ", response=" + response);
- break;
- } else {
- LOG.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount + ". Will sleep for " +
- waitIntervalMS + " ms and try again." + " solr=" + solrDetail + ", response=" + response);
- }
- Thread.sleep(waitIntervalMS);
- }
- } catch (Throwable t) {
- LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t);
- }
- return status;
- }
-
- public Map<String, Object> getConfigDoc() {
- HashMap<String, Object> configMap = new HashMap<String, Object>();
- SolrQuery solrQuery = new SolrQuery();
- solrQuery.setQuery("*:*");
- String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
- solrQuery.setFilterQueries(fq);
- try {
- QueryResponse response = process(solrQuery);
- if (response != null) {
- SolrDocumentList documentList = response.getResults();
- if (CollectionUtils.isNotEmpty(documentList)) {
- SolrDocument configDoc = documentList.get(0);
- String configJson = LogFeederUtil.getGson().toJson(configDoc);
- configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson);
- }
- }
- } catch (Exception e) {
- String logMessageKey = this.getClass().getSimpleName() + "_FETCH_FILTER_CONFIG_ERROR";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting filter config from solr", e, LOG, Level.ERROR);
- }
- return configMap;
- }
-
- private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
- if (solrClient != null) {
- QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
- return queryResponse;
- } else {
- LOG.error("solrClient can't be null");
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
deleted file mode 100644
index 0ece637..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-public class LogConfigHandler extends Thread {
- private static final Logger LOG = Logger.getLogger(LogConfigHandler.class);
-
- private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5;
- private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
- private static final String TIMEZONE = "GMT";
- private static final int RETRY_INTERVAL = 30;
-
- static {
- TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
- }
-
- private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
- protected DateFormat initialValue() {
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
- dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
- return dateFormat;
- }
- };
-
- private static boolean filterEnabled;
- private static LogFeederFilterWrapper logFeederFilterWrapper;
-
- private static boolean running = false;
-
- public static void handleConfig() {
- filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
- if (!filterEnabled) {
- LOG.info("Logfeeder filter Scheduler is disabled.");
- return;
- }
- if (!running) {
- new LogConfigHandler().start();
- running = true;
- LOG.info("Logfeeder Filter Thread started!");
- } else {
- LOG.warn("Logfeeder Filter Thread is already running.");
- }
- }
-
- private LogConfigHandler() {
- setName(getClass().getSimpleName());
- setDaemon(true);
- }
-
- @Override
- public void run() {
- String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
- String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
- if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) {
- LOG.warn("Neither Solr ZK Connect String nor solr Url for UserConfig/History is set." +
- "Won't look for level configuration from Solr.");
- return;
- }
-
- int solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", DEFAULT_SOLR_CONFIG_INTERVAL);
- do {
- LOG.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
- fetchConfig();
- try {
- Thread.sleep(1000 * solrConfigInterval);
- } catch (InterruptedException e) {
- LOG.error(e.getLocalizedMessage(), e.getCause());
- }
- } while (true);
- }
-
- private synchronized void fetchConfig() {
- LogConfigFetcher fetcher = LogConfigFetcher.getInstance();
- if (fetcher != null) {
- Map<String, Object> configDocMap = fetcher.getConfigDoc();
- String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
- if (configJson != null) {
- logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, LogFeederFilterWrapper.class);
- }
- }
- }
-
- public static boolean isFilterAvailable() {
- return logFeederFilterWrapper != null;
- }
-
- public static List<String> getAllowedLevels(String hostName, LogFeederFilter componentFilter) {
- String componentName = componentFilter.getLabel();
- List<String> hosts = componentFilter.getHosts();
- List<String> defaultLevels = componentFilter.getDefaultLevels();
- List<String> overrideLevels = componentFilter.getOverrideLevels();
- String expiryTime = componentFilter.getExpiryTime();
-
- // check is user override or not
- if (StringUtils.isNotEmpty(expiryTime) || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) {
- if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts
- hosts.add(LogFeederConstants.ALL);
- }
-
- if (LogFeederUtil.isListContains(hosts, hostName, false)) {
- if (isFilterExpired(componentFilter)) {
- LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
- componentFilter.getExpiryTime());
- return defaultLevels;
- } else {
- return overrideLevels;
- }
- }
- }
- return defaultLevels;
- }
-
- private static boolean isFilterExpired(LogFeederFilter logfeederFilter) {
- if (logfeederFilter == null)
- return false;
-
- Date filterEndDate = parseFilterExpireDate(logfeederFilter);
- if (filterEndDate == null) {
- return false;
- }
-
- Date currentDate = new Date();
- if (!currentDate.before(filterEndDate)) {
- LOG.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts : [" +
- StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
- formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
- return true;
- } else {
- return false;
- }
- }
-
- private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) {
- String expiryTime = vLogfeederFilter.getExpiryTime();
- if (StringUtils.isNotEmpty(expiryTime)) {
- try {
- return formatter.get().parse(expiryTime);
- } catch (ParseException e) {
- LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
- + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), ',') + "]");
- }
- }
- return null;
- }
-
- public static LogFeederFilter findComponentFilter(String componentName) {
- waitForFilter();
-
- if (logFeederFilterWrapper != null) {
- HashMap<String, LogFeederFilter> filter = logFeederFilterWrapper.getFilter();
- if (filter != null) {
- LogFeederFilter componentFilter = filter.get(componentName);
- if (componentFilter != null) {
- return componentFilter;
- }
- }
- }
- LOG.trace("Filter is not there for component :" + componentName);
- return null;
- }
-
- private static void waitForFilter() {
- if (!filterEnabled || logFeederFilterWrapper != null) {
- return;
- }
-
- while (true) {
- try {
- Thread.sleep(RETRY_INTERVAL * 1000);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
-
- LOG.info("Checking if config is available");
- if (logFeederFilterWrapper != null) {
- LOG.info("Config is available");
- return;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
deleted file mode 100644
index 60c8ae8..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class LogFeederFilter {
-
- private String label;
- private List<String> hosts;
- private List<String> defaultLevels;
- private List<String> overrideLevels;
- private String expiryTime;
-
- public LogFeederFilter() {
- hosts = new ArrayList<String>();
- defaultLevels = new ArrayList<String>();
- overrideLevels = new ArrayList<String>();
- }
-
- public String getLabel() {
- return label;
- }
-
- public void setLabel(String label) {
- this.label = label;
- }
-
- public List<String> getHosts() {
- return hosts;
- }
-
- public void setHosts(List<String> hosts) {
- this.hosts = hosts;
- }
-
- public List<String> getDefaultLevels() {
- return defaultLevels;
- }
-
- public void setDefaultLevels(List<String> defaultLevels) {
- this.defaultLevels = defaultLevels;
- }
-
- public List<String> getOverrideLevels() {
- return overrideLevels;
- }
-
- public void setOverrideLevels(List<String> overrideLevels) {
- this.overrideLevels = overrideLevels;
- }
-
- public String getExpiryTime() {
- return expiryTime;
- }
-
- public void setExpiryTime(String expiryTime) {
- this.expiryTime = expiryTime;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
deleted file mode 100644
index 9199cd3..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.HashMap;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class LogFeederFilterWrapper {
-
- private HashMap<String, LogFeederFilter> filter;
- private String id;
-
- public HashMap<String, LogFeederFilter> getFilter() {
- return filter;
- }
-
- public void setFilter(HashMap<String, LogFeederFilter> filter) {
- this.filter = filter;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
new file mode 100644
index 0000000..1f635af
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.loglevelfilter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+public enum FilterLogData {
+ INSTANCE;
+
+ private static final Logger LOG = Logger.getLogger(FilterLogData.class);
+
+ private static final boolean DEFAULT_VALUE = true;
+
+ public boolean isAllowed(String jsonBlock, InputMarker inputMarker) {
+ if (StringUtils.isEmpty(jsonBlock)) {
+ return DEFAULT_VALUE;
+ }
+ Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+ return isAllowed(jsonObj, inputMarker);
+ }
+
+ public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
+ return true;
+
+ boolean isAllowed = applyFilter(jsonObj);
+ if (!isAllowed) {
+ LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
+ }
+ return isAllowed;
+ }
+
+
+ private boolean applyFilter(Map<String, Object> jsonObj) {
+ if (MapUtils.isEmpty(jsonObj)) {
+ LOG.warn("Output jsonobj is empty");
+ return DEFAULT_VALUE;
+ }
+
+ String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+ String logId = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+ String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+ if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(logId) && StringUtils.isNotBlank(level)) {
+ return LogLevelFilterHandler.isAllowed(hostName, logId, level);
+ } else {
+ return DEFAULT_VALUE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
new file mode 100644
index 0000000..8a4d953
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.loglevelfilter;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+public class LogLevelFilterHandler implements LogLevelFilterMonitor {
+ private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class);
+
+ private static final String TIMEZONE = "GMT";
+ private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+
+ private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
+ protected DateFormat initialValue() {
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
+ return dateFormat;
+ }
+ };
+
+ private static LogSearchConfig config;
+ private static String clusterName = LogFeederUtil.getStringProperty("cluster.name");
+ private static boolean filterEnabled;
+ private static List<String> defaultLogLevels;
+ private static Map<String, LogLevelFilter> filters = new HashMap<>();
+
+ public static void init(LogSearchConfig config_) {
+ config = config_;
+ filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+ defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty("logfeeder.include.default.level").split(","));
+ TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+ }
+
+ @Override
+ public void setLogLevelFilter(String logId, LogLevelFilter logLevelFilter) {
+ synchronized (LogLevelFilterHandler.class) {
+ filters.put(logId, logLevelFilter);
+ }
+ }
+
+ @Override
+ public void removeLogLevelFilter(String logId) {
+ synchronized (LogLevelFilterHandler.class) {
+ filters.remove(logId);
+ }
+ }
+
+ public static boolean isAllowed(String hostName, String logId, String level) {
+ if (!filterEnabled) {
+ return true;
+ }
+
+ LogLevelFilter logFilter = findLogFilter(logId);
+ List<String> allowedLevels = getAllowedLevels(hostName, logFilter);
+ return allowedLevels.isEmpty() || allowedLevels.contains(level);
+ }
+
+ private static synchronized LogLevelFilter findLogFilter(String logId) {
+ LogLevelFilter logFilter = filters.get(logId);
+ if (logFilter != null) {
+ return logFilter;
+ }
+
+ LOG.info("Filter is not present for log " + logId + ", creating default filter");
+ LogLevelFilter defaultFilter = new LogLevelFilter();
+ defaultFilter.setLabel(logId);
+ defaultFilter.setDefaultLevels(defaultLogLevels);
+
+ try {
+ config.createLogLevelFilter(clusterName, logId, defaultFilter);
+ filters.put(logId, defaultFilter);
+ } catch (Exception e) {
+ LOG.warn("Could not persist the default filter for log " + logId, e);
+ }
+
+ return defaultFilter;
+ }
+
+ private static List<String> getAllowedLevels(String hostName, LogLevelFilter componentFilter) {
+ String componentName = componentFilter.getLabel();
+ List<String> hosts = componentFilter.getHosts();
+ List<String> defaultLevels = componentFilter.getDefaultLevels();
+ List<String> overrideLevels = componentFilter.getOverrideLevels();
+ Date expiryTime = componentFilter.getExpiryTime();
+
+ // check is user override or not
+ if (expiryTime != null || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) {
+ if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts
+ hosts.add(LogFeederConstants.ALL);
+ }
+
+ if (hosts.isEmpty() || hosts.contains(hostName)) {
+ if (isFilterExpired(componentFilter)) {
+ LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
+ componentFilter.getExpiryTime());
+ return defaultLevels;
+ } else {
+ return overrideLevels;
+ }
+ }
+ }
+ return defaultLevels;
+ }
+
+ private static boolean isFilterExpired(LogLevelFilter logLevelFilter) {
+ if (logLevelFilter == null)
+ return false;
+
+ Date filterEndDate = logLevelFilter.getExpiryTime();
+ if (filterEndDate == null) {
+ return false;
+ }
+
+ Date currentDate = new Date();
+ if (!currentDate.before(filterEndDate)) {
+ LOG.debug("Filter for Component :" + logLevelFilter.getLabel() + " and Hosts : [" +
+ StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
+ formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 135fb32..ba872f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -29,7 +29,7 @@ import java.util.UUID;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.FilterLogData;
+import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.MurmurHash;
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index bb2f0a9..1929178 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Properties;
import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -311,24 +310,6 @@ public class LogFeederUtil {
return false;
}
}
-
- public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
- if (list == null) {
- return false;
- }
-
- for (String value : list) {
- if (value == null) {
- continue;
- }
-
- if (caseSensitive ? value.equals(str) : value.equalsIgnoreCase(str) ||
- value.equalsIgnoreCase(LogFeederConstants.ALL)) {
- return true;
- }
- }
- return false;
- }
private static String logfeederTempDir = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
index 266108f..44314c6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
@@ -18,8 +18,9 @@
package org.apache.ambari.logfeeder.logconfig;
-import java.lang.reflect.Field;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -29,15 +30,17 @@ import static org.junit.Assert.*;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.junit.AfterClass;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.commons.lang.time.DateUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class LogConfigHandlerTest {
- private static LogConfigFetcher mockFetcher;
-
private static InputMarker inputMarkerAudit;
private static InputMarker inputMarkerService;
static {
@@ -56,47 +59,41 @@ public class LogConfigHandlerTest {
replay(auditInput, serviceInput);
}
- private static final Map<String, Object> CONFIG_MAP = new HashMap<>();
- static {
- CONFIG_MAP.put("jsons",
- "{'filter':{" +
- "'configured_log_file':{" +
- "'label':'configured_log_file'," +
- "'hosts':[]," +
- "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
- "'overrideLevels':[]}," +
- "'configured_log_file2':{" +
- "'label':'configured_log_file2'," +
- "'hosts':['host1']," +
- "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
- "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
- "'expiryTime':'3000-01-01T00:00:00.000Z'}," +
- "'configured_log_file3':{" +
- "'label':'configured_log_file3'," +
- "'hosts':['host1']," +
- "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
- "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
- "'expiryTime':'1000-01-01T00:00:00.000Z'}" +
- "}}");
- }
-
@BeforeClass
public static void init() throws Exception {
- mockFetcher = strictMock(LogConfigFetcher.class);
- Field f = LogConfigFetcher.class.getDeclaredField("instance");
- f.setAccessible(true);
- f.set(null, mockFetcher);
- expect(mockFetcher.getConfigDoc()).andReturn(CONFIG_MAP).anyTimes();
- replay(mockFetcher);
-
LogFeederUtil.loadProperties("logfeeder.properties", null);
- LogConfigHandler.handleConfig();
- Thread.sleep(1000);
+
+ LogSearchConfig config = strictMock(LogSearchConfig.class);
+ config.createLogLevelFilter(anyString(), anyString(), anyObject(LogLevelFilter.class));
+ expectLastCall().anyTimes();
+ LogLevelFilterHandler.init(config);
+
+ LogLevelFilter logLevelFilter1 = new LogLevelFilter();
+ logLevelFilter1.setHosts(Collections.<String> emptyList());
+ logLevelFilter1.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO"));
+ logLevelFilter1.setOverrideLevels(Collections.<String> emptyList());
+
+ LogLevelFilter logLevelFilter2 = new LogLevelFilter();
+ logLevelFilter2.setHosts(Arrays.asList("host1"));
+ logLevelFilter2.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO"));
+ logLevelFilter2.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"));
+ logLevelFilter2.setExpiryTime(DateUtils.addDays(new Date(), 1));
+
+ LogLevelFilter logLevelFilter3 = new LogLevelFilter();
+ logLevelFilter3.setHosts(Arrays.asList("host1"));
+ logLevelFilter3.setDefaultLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO"));
+ logLevelFilter3.setOverrideLevels(Arrays.asList("FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"));
+ logLevelFilter3.setExpiryTime(DateUtils.addDays(new Date(), -1));
+
+ LogLevelFilterHandler h = new LogLevelFilterHandler();
+ h.setLogLevelFilter("configured_log_file1", logLevelFilter1);
+ h.setLogLevelFilter("configured_log_file2", logLevelFilter2);
+ h.setLogLevelFilter("configured_log_file3", logLevelFilter3);
}
@Test
public void testLogConfigHandler_auditAllowed() throws Exception {
- assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'DEBUG'}",
+ assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'DEBUG'}",
inputMarkerAudit));
}
@@ -109,19 +106,25 @@ public class LogConfigHandlerTest {
@Test
public void testLogConfigHandler_notConfiguredLogAllowed() throws Exception {
- assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file', 'level':'INFO'}",
+ assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file1', 'level':'WARN'}",
+ inputMarkerService));
+ }
+
+ @Test
+ public void testLogConfigHandler_notConfiguredLogNotAllowed() throws Exception {
+ assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file1', 'level':'TRACE'}",
inputMarkerService));
}
@Test
public void testLogConfigHandler_configuredDataAllow() throws Exception {
- assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'INFO'}",
+ assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'INFO'}",
inputMarkerService));
}
@Test
public void testLogConfigHandler_configuredDataDontAllow() throws Exception {
- assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'DEBUG'}",
+ assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file1', 'level':'DEBUG'}",
inputMarkerService));
}
@@ -142,9 +145,4 @@ public class LogConfigHandlerTest {
assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file3', 'level':'DEBUG'}",
inputMarkerService));
}
-
- @AfterClass
- public static void finish() {
- verify(mockFetcher);
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3b94d3cf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
index 59020cc..19027d1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
@@ -17,4 +17,5 @@ logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
logfeeder.solr.zk_connect_string=some_connect_string
logfeeder.metrics.collector.hosts=some_collector_host
-node.hostname=test_host_name
\ No newline at end of file
+node.hostname=test_host_name
+logfeeder.include.default.level=FATAL,ERROR,WARN
\ No newline at end of file