You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/08/13 09:03:34 UTC
[ambari] branch trunk updated: AMBARI-24459. Log Search / Log
Feeder - support to use ZK only for storing filters (without manage input
configs) (#2040)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 73719b5 AMBARI-24459. Log Search / Log Feeder - support to use ZK only for storing filters (without manage input configs) (#2040)
73719b5 is described below
commit 73719b530c21677ae18f29529efec94100f9e1d6
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Aug 13 11:03:30 2018 +0200
AMBARI-24459. Log Search / Log Feeder - support to use ZK only for storing filters (without manage input configs) (#2040)
---
.../config/zookeeper/LogLevelFilterManagerZK.java | 27 +++
.../zookeeper/LogSearchConfigLogFeederZK.java | 34 +---
.../config/zookeeper/LogSearchConfigServerZK.java | 5 +-
.../config/zookeeper/LogSearchConfigZK.java | 148 +---------------
...hConfigZK.java => LogSearchConfigZKHelper.java} | 187 +++++++++++++--------
.../logfeeder/common/LogFeederConstants.java | 3 +
.../ambari/logfeeder/conf/ApplicationConfig.java | 10 +-
.../ambari/logfeeder/conf/LogFeederProps.java | 18 ++
.../loglevelfilter/LogLevelFilterHandler.java | 23 ++-
.../logsearch/common/ACLPropertiesSplitter.java | 29 +---
.../logsearch/conf/LogSearchConfigApiConfig.java | 25 +++
.../ambari/logsearch/conf/SecurityConfig.java | 15 +-
.../apache/ambari/logsearch/conf/SolrConfig.java | 7 +-
...rState.java => LogLevelFilterManagerState.java} | 2 +-
.../LogLevelManagerFilterConfigurer.java | 123 ++++++++++++++
.../configurer/SolrLogLevelFilterConfigurer.java | 92 ----------
.../logsearch/manager/ShipperConfigManager.java | 12 +-
.../LogSearchLogLevelFilterManagerFilter.java | 10 +-
.../test-config/logfeeder/logfeeder.properties | 3 +-
.../test-config/logsearch/logsearch.properties | 1 +
20 files changed, 381 insertions(+), 393 deletions(-)
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index 1ad7517..e62ec1b 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -43,6 +43,22 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
private final Gson gson;
private final List<ACL> aclList;
+ public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception {
+ this.client = LogSearchConfigZKHelper.createZKClient(properties);
+ this.serverCache = new TreeCache(client, "/");
+ this.aclList = LogSearchConfigZKHelper.getAcls(properties);
+ this.gson = LogSearchConfigZKHelper.createGson();
+ this.serverCache.start();
+ }
+
+ public LogLevelFilterManagerZK(Map<String, String> properties, CuratorFramework client) throws Exception {
+ this.client = client;
+ this.serverCache = new TreeCache(client, "/");
+ this.aclList = LogSearchConfigZKHelper.getAcls(properties);
+ this.gson = LogSearchConfigZKHelper.createGson();
+ this.serverCache.start();
+ }
+
public LogLevelFilterManagerZK(CuratorFramework client, TreeCache serverCache, List<ACL> aclList, Gson gson) {
this.client = client;
this.serverCache = serverCache;
@@ -91,4 +107,15 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
return logLevelFilters;
}
+ public CuratorFramework getClient() {
+ return client;
+ }
+
+ public TreeCache getServerCache() {
+ return serverCache;
+ }
+
+ public Gson getGson() {
+ return gson;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
index 8082ba7..0c565d3 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
@@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.ambari.logsearch.config.json.JsonHelper;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
@@ -47,20 +46,14 @@ import com.google.gson.JsonParser;
public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements LogSearchConfigLogFeeder {
private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigLogFeederZK.class);
- private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
-
private TreeCache logFeederClusterCache;
@Override
public void init(Map<String, String> properties, String clusterName) throws Exception {
super.init(properties);
- while (client.checkExists().forPath("/") == null) {
- LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
- Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
- }
-
- logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName));
- LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, null, getAcls(), gson);
+ LogSearchConfigZKHelper.waitUntilRootAvailable(client);
+ logFeederClusterCache = LogSearchConfigZKHelper.createClusterCache(client, clusterName);
+ LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, null, LogSearchConfigZKHelper.getAcls(properties), gson);
setLogLevelFilterManager(logLevelFilterManager);
}
@@ -99,7 +92,7 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
if (event.getData().getPath().startsWith(configPathStab + "input/")) {
handleInputConfigChange(eventType, nodeName, nodeData);
} else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
- handleLogLevelFilterChange(eventType, nodeName, nodeData);
+ LogSearchConfigZKHelper.handleLogLevelFilterChange(eventType, nodeName, nodeData, gson, logLevelFilterMonitor);
}
}
@@ -143,23 +136,6 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
}
}
-
- private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) {
- switch (eventType) {
- case NODE_ADDED:
- case NODE_UPDATED:
- LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
- LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
- logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
- break;
- case NODE_REMOVED:
- LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
- logLevelFilterMonitor.removeLogLevelFilter(nodeName);
- break;
- default:
- break;
- }
- }
};
logFeederClusterCache.getListenable().addListener(listener);
logFeederClusterCache.start();
@@ -173,7 +149,7 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
client.setData().forPath(globalConfigNodePath, data.getBytes());
} else {
- client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes());
+ client.create().creatingParentContainersIfNeeded().withACL(LogSearchConfigZKHelper.getAcls(properties)).forPath(globalConfigNodePath, data.getBytes());
}
} catch (Exception e) {
LOG.warn("Exception during global config node creation/update", e);
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
index d8fd79b..546e4d4 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
@@ -52,12 +52,9 @@ public class LogSearchConfigServerZK extends LogSearchConfigZK implements LogSea
if (client.checkExists().forPath("/") == null) {
client.create().creatingParentContainersIfNeeded().forPath("/");
}
- if (client.checkExists().forPath("/output") == null) {
- client.create().creatingParentContainersIfNeeded().forPath("/output");
- }
serverCache = new TreeCache(client, "/");
serverCache.start();
- LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, serverCache, getAcls(), gson);
+ LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, serverCache, LogSearchConfigZKHelper.getAcls(properties), gson);
setLogLevelFilterManager(logLevelFilterManager);
}
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 7e51c35..dcbedd5 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -19,136 +19,37 @@
package org.apache.ambari.logsearch.config.zookeeper;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.retry.RetryForever;
-import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Splitter;
import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
public class LogSearchConfigZK implements LogSearchConfig {
private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
- private static final int DEFAULT_SESSION_TIMEOUT = 60000;
- private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
- private static final int RETRY_INTERVAL_MS = 10000;
- private static final String DEFAULT_ZK_ROOT = "/logsearch";
- private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_connect_string",
- description = "ZooKeeper connection string.",
- examples = {"localhost1:2181,localhost2:2181/znode"},
- sources = {"logsearch.properties", "logfeeder.properties"}
- )
- private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_acls",
- description = "ZooKeeper ACLs for handling configs. (read & write)",
- examples = {"world:anyone:r,sasl:solr:cdrwa,sasl:logsearch:cdrwa"},
- sources = {"logsearch.properties", "logfeeder.properties"},
- defaultValue = "world:anyone:cdrwa"
- )
- private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_root",
- description = "ZooKeeper root node where the shippers are stored. (added to the connection string)",
- examples = {"/logsearch"},
- sources = {"logsearch.properties", "logfeeder.properties"}
- )
- private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_session_time_out_ms",
- description = "ZooKeeper session timeout in milliseconds",
- examples = {"60000"},
- sources = {"logsearch.properties", "logfeeder.properties"}
- )
- private static final String ZK_SESSION_TIMEOUT_PROPERTY = "logsearch.config.zk_session_time_out_ms";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_connection_time_out_ms",
- description = "ZooKeeper connection timeout in milliseconds",
- examples = {"30000"},
- sources = {"logsearch.properties", "logfeeder.properties"}
- )
- private static final String ZK_CONNECTION_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_time_out_ms";
-
- @LogSearchPropertyDescription(
- name = "logsearch.config.zk_connection_retry_time_out_ms",
- description = "The maximum elapsed time for connecting to ZooKeeper in milliseconds. 0 means retrying forever.",
- examples = {"1200000"},
- sources = {"logsearch.properties", "logfeeder.properties"}
- )
- private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
-
protected Map<String, String> properties;
protected CuratorFramework client;
- protected TreeCache outputCache;
protected Gson gson;
protected LogLevelFilterManager logLevelFilterManager;
public void init(Map<String, String> properties) throws Exception {
this.properties = properties;
-
- String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
- LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
- client = CuratorFrameworkFactory.builder()
- .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
- .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
- .connectionTimeoutMs(getIntProperty(ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
- .sessionTimeoutMs(getIntProperty(ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
- .build();
+ client = LogSearchConfigZKHelper.createZKClient(properties);
client.start();
-
- outputCache = new TreeCache(client, "/output");
- outputCache.start();
-
- gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
- }
-
- private int getIntProperty(String propertyKey, int defaultValue) {
- if (properties.get(propertyKey) == null)
- return defaultValue;
- return Integer.parseInt(properties.get(propertyKey));
- }
-
- private RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
- if (zkConnectionRetryTimeoutValue == null)
- return new RetryForever(RETRY_INTERVAL_MS);
- int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
- if (maxElapsedTimeMs == 0)
- return new RetryForever(RETRY_INTERVAL_MS);
- return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
+ gson = LogSearchConfigZKHelper.createGson();
}
@Override
public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
try {
- client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
+ client.create().creatingParentContainersIfNeeded().withACL(LogSearchConfigZKHelper.getAcls(properties)).forPath(nodePath, inputConfig.getBytes());
LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
} catch (NodeExistsException e) {
LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
@@ -165,49 +66,6 @@ public class LogSearchConfigZK implements LogSearchConfig {
this.logLevelFilterManager = logLevelFilterManager;
}
- protected List<ACL> getAcls() {
- String aclStr = properties.get(ZK_ACLS_PROPERTY);
- if (StringUtils.isBlank(aclStr)) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
- }
-
- List<ACL> acls = new ArrayList<>();
- List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
- for (String unparcedAcl : aclStrList) {
- String[] parts = unparcedAcl.split(":");
- if (parts.length == 3) {
- acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
- }
- }
- return acls;
- }
-
- private Integer parsePermission(String permission) {
- int permissionCode = 0;
- for (char each : permission.toLowerCase().toCharArray()) {
- switch (each) {
- case 'r':
- permissionCode |= ZooDefs.Perms.READ;
- break;
- case 'w':
- permissionCode |= ZooDefs.Perms.WRITE;
- break;
- case 'c':
- permissionCode |= ZooDefs.Perms.CREATE;
- break;
- case 'd':
- permissionCode |= ZooDefs.Perms.DELETE;
- break;
- case 'a':
- permissionCode |= ZooDefs.Perms.ADMIN;
- break;
- default:
- throw new IllegalArgumentException("Unsupported permission: " + permission);
- }
- }
- return permissionCode;
- }
-
@Override
public void close() {
LOG.info("Closing ZooKeeper Connection");
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
similarity index 57%
copy from ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
copy to ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
index 7e51c35..b26181d 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logsearch.config.zookeeper;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.commons.collections.MapUtils;
@@ -33,21 +31,28 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryUntilElapsed;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Splitter;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility functions for handling ZK operation and monitor ZK data for Log Search configuration
+ */
+public class LogSearchConfigZKHelper {
-public class LogSearchConfigZK implements LogSearchConfig {
- private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZKHelper.class);
private static final int DEFAULT_SESSION_TIMEOUT = 60000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
@@ -104,38 +109,52 @@ public class LogSearchConfigZK implements LogSearchConfig {
)
private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
- protected Map<String, String> properties;
- protected CuratorFramework client;
- protected TreeCache outputCache;
- protected Gson gson;
- protected LogLevelFilterManager logLevelFilterManager;
+ private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
- public void init(Map<String, String> properties) throws Exception {
- this.properties = properties;
-
+ private LogSearchConfigZKHelper() {
+ }
+
+ /**
+ * Create ZK curator client from a configuration (map holds the configs for that)
+ */
+ public static CuratorFramework createZKClient(Map<String, String> properties) {
String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
- client = CuratorFrameworkFactory.builder()
- .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
- .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
- .connectionTimeoutMs(getIntProperty(ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
- .sessionTimeoutMs(getIntProperty(ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
- .build();
- client.start();
-
- outputCache = new TreeCache(client, "/output");
- outputCache.start();
-
- gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+ return CuratorFrameworkFactory.builder()
+ .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
+ .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
+ .connectionTimeoutMs(getIntProperty(properties, ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
+ .sessionTimeoutMs(getIntProperty(properties, ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
+ .build();
}
- private int getIntProperty(String propertyKey, int defaultValue) {
+ /**
+ * Get ACLs from a property (get the value then parse and transform it as ACL objects)
+ */
+ public static List<ACL> getAcls(Map<String, String> properties) {
+ String aclStr = properties.get(ZK_ACLS_PROPERTY);
+ if (StringUtils.isBlank(aclStr)) {
+ return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ }
+
+ List<ACL> acls = new ArrayList<>();
+ List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+ for (String unparcedAcl : aclStrList) {
+ String[] parts = unparcedAcl.split(":");
+ if (parts.length == 3) {
+ acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+ }
+ }
+ return acls;
+ }
+
+ private static int getIntProperty(Map<String, String> properties, String propertyKey, int defaultValue) {
if (properties.get(propertyKey) == null)
return defaultValue;
return Integer.parseInt(properties.get(propertyKey));
}
- private RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
+ private static RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
if (zkConnectionRetryTimeoutValue == null)
return new RetryForever(RETRY_INTERVAL_MS);
int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
@@ -144,45 +163,76 @@ public class LogSearchConfigZK implements LogSearchConfig {
return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
}
- @Override
- public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
- String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
- try {
- client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
- LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
- } catch (NodeExistsException e) {
- LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
- }
+ /**
+ * Create listener for znode of log level filters - can be used for Log Feeder as it can be useful if it's monitoring the log level changes
+ */
+ public static TreeCacheListener createTreeCacheListener(String clusterName, Gson gson, LogLevelFilterMonitor logLevelFilterMonitor) {
+ return new TreeCacheListener() {
+ private final Set<TreeCacheEvent.Type> nodeEvents = ImmutableSet.of(TreeCacheEvent.Type.NODE_ADDED, TreeCacheEvent.Type.NODE_UPDATED, TreeCacheEvent.Type.NODE_REMOVED);
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (!nodeEvents.contains(event.getType())) {
+ return;
+ }
+ String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
+ String nodeData = new String(event.getData().getData());
+ TreeCacheEvent.Type eventType = event.getType();
+
+ String configPathStab = String.format("/%s/", clusterName);
+
+ if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
+ handleLogLevelFilterChange(eventType, nodeName, nodeData, gson, logLevelFilterMonitor);
+ }
+ }
+ };
}
- @Override
- public LogLevelFilterManager getLogLevelFilterManager() {
- return this.logLevelFilterManager;
+ /**
+ * Create root + cluster name znode cache
+ */
+ public static TreeCache createClusterCache(CuratorFramework client, String clusterName) {
+ return new TreeCache(client, String.format("/%s", clusterName));
}
- @Override
- public void setLogLevelFilterManager(LogLevelFilterManager logLevelFilterManager) {
- this.logLevelFilterManager = logLevelFilterManager;
+ /**
+ * Assign listener to cluster cache and start to use that listener
+ */
+ public static void addAndStartListenersOnCluster(TreeCache clusterCache, TreeCacheListener listener) throws Exception {
+ clusterCache.getListenable().addListener(listener);
+ clusterCache.start();
}
- protected List<ACL> getAcls() {
- String aclStr = properties.get(ZK_ACLS_PROPERTY);
- if (StringUtils.isBlank(aclStr)) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ public static void waitUntilRootAvailable(CuratorFramework client) throws Exception {
+ while (client.checkExists().forPath("/") == null) {
+ LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
+ Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
}
+ }
- List<ACL> acls = new ArrayList<>();
- List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
- for (String unparcedAcl : aclStrList) {
- String[] parts = unparcedAcl.split(":");
- if (parts.length == 3) {
- acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
- }
+ /**
+ * Call log level filter monitor interface to handle node related operations (on update/remove)
+ */
+ public static void handleLogLevelFilterChange(final TreeCacheEvent.Type eventType, final String nodeName, final String nodeData,
+ final Gson gson, final LogLevelFilterMonitor logLevelFilterMonitor) {
+ switch (eventType) {
+ case NODE_ADDED:
+ case NODE_UPDATED:
+ LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
+ LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
+ logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
+ break;
+ case NODE_REMOVED:
+ LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
+ logLevelFilterMonitor.removeLogLevelFilter(nodeName);
+ break;
+ default:
+ break;
}
- return acls;
}
- private Integer parsePermission(String permission) {
+ /**
+ * Pares ZK ACL permission string and transform it to an integer
+ */
+ public static Integer parsePermission(String permission) {
int permissionCode = 0;
for (char each : permission.toLowerCase().toCharArray()) {
switch (each) {
@@ -208,9 +258,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
return permissionCode;
}
- @Override
- public void close() {
- LOG.info("Closing ZooKeeper Connection");
- client.close();
+ public static Gson createGson() {
+ return new GsonBuilder().setDateFormat(DATE_FORMAT).create();
}
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 251b4fc..1d56924 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -98,6 +98,9 @@ public class LogFeederConstants {
public static final String USE_SOLR_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.solr.enabled";
public static final boolean USE_SOLR_FILTER_STORAGE_DEFAULT = false;
+ public static final String USE_ZK_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.zk.enabled";
+ public static final boolean USE_ZK_FILTER_STORAGE_DEFAULT = false;
+
public static final String MONITOR_SOLR_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.solr.monitor.enabled";
public static final boolean MONITOR_SOLR_FILTER_STORAGE_DEFAULT = true;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index ccae373..f54fc0c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -38,6 +38,7 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.local.LogSearchConfigLogFeederLocal;
import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
import org.apache.ambari.logsearch.config.solr.LogLevelFilterUpdaterSolr;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
import org.apache.solr.client.solrj.SolrClient;
import org.springframework.context.annotation.Bean;
@@ -47,6 +48,7 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import javax.inject.Inject;
+import java.util.HashMap;
@Configuration
@PropertySource(value = {
@@ -92,11 +94,17 @@ public class ApplicationConfig {
}
@Bean
- public LogLevelFilterManager logLevelFilterManager() {
+ public LogLevelFilterManager logLevelFilterManager() throws Exception {
if (logFeederProps.isSolrFilterStorage()) {
SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(
logFeederProps.getSolrZkConnectString(), logFeederProps.getSolrUrls(), "history");
return new LogLevelFilterManagerSolr(solrClient);
+ } else if (logFeederProps.isUseLocalConfigs() && logFeederProps.isZkFilterStorage()) {
+ final HashMap<String, String> map = new HashMap<>();
+ for (final String name : logFeederProps.getProperties().stringPropertyNames()) {
+ map.put(name, logFeederProps.getProperties().getProperty(name));
+ }
+ return new LogLevelFilterManagerZK(map);
} else { // no default filter manager
return null;
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 12408b4..859de8f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -162,6 +162,16 @@ public class LogFeederProps implements LogFeederProperties {
public boolean solrFilterStorage;
@LogSearchPropertyDescription(
+ name = LogFeederConstants.USE_ZK_FILTER_STORAGE_PROPERTY,
+ description = "Use zk as a log level filter storage (works only with local config)",
+ examples = {"true"},
+ defaultValue = LogFeederConstants.USE_ZK_FILTER_STORAGE_DEFAULT + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.USE_ZK_FILTER_STORAGE_PROPERTY + ":" + LogFeederConstants.USE_ZK_FILTER_STORAGE_DEFAULT +"}")
+ public boolean zkFilterStorage;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.MONITOR_SOLR_FILTER_STORAGE_PROPERTY,
description = "Monitor log level filters (in solr) periodically - used for checking updates.",
examples = {"false"},
@@ -334,6 +344,14 @@ public class LogFeederProps implements LogFeederProperties {
this.solrUrlsStr = solrUrlsStr;
}
+ public boolean isZkFilterStorage() {
+ return zkFilterStorage;
+ }
+
+ public void setZkFilterStorage(boolean zkFilterStorage) {
+ this.zkFilterStorage = zkFilterStorage;
+ }
+
public String[] getSolrUrls() {
if (StringUtils.isNotBlank(this.solrUrlsStr)) {
return this.solrUrlsStr.split(",");
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
index 8c35d56..ab35f03 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.logfeeder.loglevelfilter;
+import com.google.gson.Gson;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -25,9 +26,14 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,13 +70,28 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
private LogSearchConfig config;
private Map<String, LogLevelFilter> filters = new ConcurrentHashMap<>();
+ // Use these 2 only if local config is used with zk log level filter storage
+ private TreeCache clusterCache = null;
+ private TreeCacheListener listener = null;
+
public LogLevelFilterHandler(LogSearchConfig config) {
this.config = config;
}
@PostConstruct
- public void init() {
+ public void init() throws Exception {
TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+ if (logFeederProps.isZkFilterStorage() && logFeederProps.isUseLocalConfigs()) {
+ LogLevelFilterManagerZK filterManager = (LogLevelFilterManagerZK) config.getLogLevelFilterManager();
+ CuratorFramework client = filterManager.getClient();
+ client.start();
+ Gson gson = filterManager.getGson();
+ LogSearchConfigZKHelper.waitUntilRootAvailable(client);
+ TreeCache clusterCache = LogSearchConfigZKHelper.createClusterCache(client, logFeederProps.getClusterName());
+ TreeCacheListener listener = LogSearchConfigZKHelper.createTreeCacheListener(
+ logFeederProps.getClusterName(), gson, this);
+ LogSearchConfigZKHelper.addAndStartListenersOnCluster(clusterCache, listener);
+ }
if (config.getLogLevelFilterManager() != null) {
TreeMap<String, LogLevelFilter> sortedFilters = config.getLogLevelFilterManager()
.getLogLevelFilters(logFeederProps.getClusterName())
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
index 35a2b1b..10e9d10 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logsearch.common;
import com.google.common.base.Splitter;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -36,35 +37,9 @@ public class ACLPropertiesSplitter {
for (String unparcedAcl : aclStrList) {
String[] parts = unparcedAcl.split(":");
if (parts.length == 3) {
- acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+ acls.add(new ACL(LogSearchConfigZKHelper.parsePermission(parts[2]), new Id(parts[0], parts[1])));
}
}
return acls;
}
-
- private Integer parsePermission(String permission) {
- int permissionCode = 0;
- for (char each : permission.toLowerCase().toCharArray()) {
- switch (each) {
- case 'r':
- permissionCode |= ZooDefs.Perms.READ;
- break;
- case 'w':
- permissionCode |= ZooDefs.Perms.WRITE;
- break;
- case 'c':
- permissionCode |= ZooDefs.Perms.CREATE;
- break;
- case 'd':
- permissionCode |= ZooDefs.Perms.DELETE;
- break;
- case 'a':
- permissionCode |= ZooDefs.Perms.ADMIN;
- break;
- default:
- throw new IllegalArgumentException("Unsupported permission: " + permission);
- }
- }
- return permissionCode;
- }
}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
index 7f3632d..2765ebd 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
@@ -18,8 +18,10 @@
*/
package org.apache.ambari.logsearch.conf;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.apache.ambari.logsearch.common.LogSearchConstants.LOGSEARCH_PROPERTIES_FILE;
@@ -47,6 +49,21 @@ public class LogSearchConfigApiConfig {
@Value("${logsearch.config.api.filter.solr.enabled:false}")
public boolean solrFilterStorage;
+ @LogSearchPropertyDescription(
+ name = "logsearch.config.api.filter.zk-only.enabled",
+ description = "Use zookeeper as a log level filter storage",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LOGSEARCH_PROPERTIES_FILE}
+ )
+ @Value("${logsearch.config.api.filter.zk.enabled:false}")
+ public boolean zkFilterStorage;
+
+ @Bean(name = "logLevelFilterManagerState")
+ public LogLevelFilterManagerState logLevelFilterManagerState() {
+ return new LogLevelFilterManagerState();
+ }
+
public boolean isConfigApiEnabled() {
return configApiEnabled;
}
@@ -62,4 +79,12 @@ public class LogSearchConfigApiConfig {
public void setSolrFilterStorage(boolean solrFilterStorage) {
this.solrFilterStorage = solrFilterStorage;
}
+
+ public boolean isZkFilterStorage() {
+ return zkFilterStorage;
+ }
+
+ public void setZkFilterStorage(boolean zkFilterStorage) {
+ this.zkFilterStorage = zkFilterStorage;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
index 69d63d2..0bc1519 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
@@ -22,8 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.ambari.logsearch.conf.global.LogSearchConfigState;
import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
import org.apache.ambari.logsearch.web.authenticate.LogsearchAuthFailureHandler;
import org.apache.ambari.logsearch.web.authenticate.LogsearchAuthSuccessHandler;
import org.apache.ambari.logsearch.web.authenticate.LogsearchLogoutSuccessHandler;
@@ -44,7 +43,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
-import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.access.intercept.FilterSecurityInterceptor;
import org.springframework.security.web.authentication.www.BasicAuthenticationFilter;
import org.springframework.security.web.util.matcher.AntPathRequestMatcher;
@@ -90,8 +88,8 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
private SolrCollectionState solrEventHistoryState;
@Inject
- @Named("solrLogLevelFilterManagerState")
- private SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
+ @Named("logLevelFilterManagerState")
+ private LogLevelFilterManagerState logLevelFilterManagerState;
@Inject
private LogSearchConfigState logSearchConfigState;
@@ -190,7 +188,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Bean
public LogSearchConfigStateFilter logSearchConfigStateFilter() {
- if (logSearchConfigApiConfig.isSolrFilterStorage()) {
+ if (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage()) {
return new LogSearchConfigStateFilter(shipperConfigInputRequestMatcher(), logSearchConfigState, logSearchConfigApiConfig.isConfigApiEnabled());
} else {
return new LogSearchConfigStateFilter(logsearchConfigRequestMatcher(), logSearchConfigState, logSearchConfigApiConfig.isConfigApiEnabled());
@@ -199,8 +197,9 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Bean
public LogSearchLogLevelFilterManagerFilter logSearchLogLevelFilterManagerFilter() {
- boolean enabled = logSearchConfigApiConfig.isSolrFilterStorage() && !logSearchConfigApiConfig.isConfigApiEnabled();
- return new LogSearchLogLevelFilterManagerFilter(logLevelFilterRequestMatcher(), solrLogLevelFilterManagerState, enabled);
+ boolean enabled = (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage())
+ && !logSearchConfigApiConfig.isConfigApiEnabled();
+ return new LogSearchLogLevelFilterManagerFilter(logLevelFilterRequestMatcher(), logLevelFilterManagerState, enabled);
}
@Bean
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
index 7577d6e..33f4f6f 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
@@ -20,7 +20,7 @@ package org.apache.ambari.logsearch.conf;
import org.apache.ambari.logsearch.conf.global.SolrAuditLogsState;
import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
import org.apache.ambari.logsearch.conf.global.SolrServiceLogsState;
import org.apache.ambari.logsearch.conf.global.SolrEventHistoryState;
import org.apache.ambari.logsearch.dao.SolrSchemaFieldDao;
@@ -54,11 +54,6 @@ public class SolrConfig {
return new SolrEventHistoryState();
}
- @Bean(name = "solrLogLevelFilterManagerState")
- public SolrLogLevelFilterManagerState solrLogLevelFilterManagerState() {
- return new SolrLogLevelFilterManagerState();
- }
-
@Bean
public SolrClientsHolder solrClientsHolder() {
return new SolrClientsHolder();
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
similarity index 96%
rename from ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java
rename to ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
index d610e10..afd5313 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logsearch.conf.global;
import javax.inject.Named;
@Named
-public class SolrLogLevelFilterManagerState {
+public class LogLevelFilterManagerState {
private volatile boolean logLevelFilterManagerIsReady;
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java
new file mode 100644
index 0000000..5efca85
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.configurer;
+
+import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
+import org.apache.ambari.logsearch.conf.LogSearchConfigMapHolder;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
+import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
+import org.apache.ambari.logsearch.dao.EventHistorySolrDao;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class LogLevelManagerFilterConfigurer implements Configurer {
+ private static final Logger logger = LoggerFactory.getLogger(LogLevelManagerFilterConfigurer.class);
+
+ private static final int RETRY_INTERVAL_SECONDS = 10;
+
+ private final EventHistorySolrDao eventHistorySolrDao;
+ private final LogLevelFilterManagerState logLevelFilterManagerState;
+ private final LogSearchConfigApiConfig logSearchConfigApiConfig;
+ private final LogSearchConfigMapHolder logSearchConfigMapHolder;
+
+ private LogLevelFilterManagerSolr logLevelFilterManagerSolr;
+ private LogLevelFilterManagerZK logLevelFilterManagerZK;
+
+ @Inject
+ public LogLevelManagerFilterConfigurer(final LogSearchConfigApiConfig logSearchConfigApiConfig,
+ final LogLevelFilterManagerState logLevelFilterManagerState,
+ final EventHistorySolrDao eventHistorySolrDao,
+ final LogSearchConfigMapHolder logSearchConfigMapHolder) {
+ this.logSearchConfigApiConfig = logSearchConfigApiConfig;
+ this.logLevelFilterManagerState = logLevelFilterManagerState;
+ this.eventHistorySolrDao = eventHistorySolrDao;
+ this.logSearchConfigMapHolder = logSearchConfigMapHolder;
+ }
+
+ @PostConstruct
+ @Override
+ public void start() {
+ Thread setupThread = new Thread("setup_solr_loglevel_filter_manager") {
+ @Override
+ public void run() {
+ logger.info("Start initializing log level filter manager ...");
+ if (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage()) {
+ while (true) {
+ try {
+ if (logSearchConfigApiConfig.isSolrFilterStorage()) {
+ if (eventHistorySolrDao.getSolrCollectionState().isSolrCollectionReady()) {
+ setLogLevelFilterManagerSolr(new LogLevelFilterManagerSolr(eventHistorySolrDao.getSolrClient()));
+ logLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
+ logger.info("Log level filter manager (solr) successfully initialized.");
+ break;
+ }
+ }
+ if (logSearchConfigApiConfig.isZkFilterStorage()) {
+ CuratorFramework client = LogSearchConfigZKHelper.createZKClient(logSearchConfigMapHolder.getLogsearchProperties());
+ client.start();
+ if (client.checkExists().forPath("/") == null) {
+ client.create().creatingParentContainersIfNeeded().forPath("/");
+ }
+ LogLevelFilterManagerZK logLevelFilterManagerZK = new LogLevelFilterManagerZK(
+ logSearchConfigMapHolder.getLogsearchProperties(), client);
+ setLogLevelFilterManagerZK(logLevelFilterManagerZK);
+ logLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
+ logger.info("Log level filter manager (zookeeper) successfully initialized.");
+ break;
+ }
+ } catch (Exception ex) {
+ logger.warn("Could not initialize log level Solr filter manager, going to sleep for " + RETRY_INTERVAL_SECONDS + " seconds ", ex);
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL_SECONDS * 1000);
+ } catch (Exception e) {/* ignore */}
+ }
+ } else {
+ logger.info("Solr is not used as a log level filter storage.");
+ }
+ }
+ };
+ setupThread.setDaemon(true);
+ setupThread.start();
+ }
+
+ public LogLevelFilterManagerSolr getLogLevelFilterManagerSolr() {
+ return logLevelFilterManagerSolr;
+ }
+
+ public void setLogLevelFilterManagerSolr(final LogLevelFilterManagerSolr logLevelFilterManagerSolr) {
+ this.logLevelFilterManagerSolr = logLevelFilterManagerSolr;
+ }
+
+ public LogLevelFilterManagerZK getLogLevelFilterManagerZK() {
+ return logLevelFilterManagerZK;
+ }
+
+ public void setLogLevelFilterManagerZK(LogLevelFilterManagerZK logLevelFilterManagerZK) {
+ this.logLevelFilterManagerZK = logLevelFilterManagerZK;
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java
deleted file mode 100644
index e62c6c4..0000000
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logsearch.configurer;
-
-import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
-import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
-import org.apache.ambari.logsearch.dao.EventHistorySolrDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import javax.inject.Named;
-
-@Named
-public class SolrLogLevelFilterConfigurer implements Configurer {
- private static final Logger logger = LoggerFactory.getLogger(SolrLogLevelFilterConfigurer.class);
-
- private static final int RETRY_INTERVAL_SECONDS = 10;
-
- private final EventHistorySolrDao eventHistorySolrDao;
- private final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
- private final LogSearchConfigApiConfig logSearchConfigApiConfig;
-
- private LogLevelFilterManagerSolr logLevelFilterManagerSolr;
-
- @Inject
- public SolrLogLevelFilterConfigurer(final LogSearchConfigApiConfig logSearchConfigApiConfig,
- final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState,
- final EventHistorySolrDao eventHistorySolrDao) {
- this.logSearchConfigApiConfig = logSearchConfigApiConfig;
- this.solrLogLevelFilterManagerState = solrLogLevelFilterManagerState;
- this.eventHistorySolrDao = eventHistorySolrDao;
- }
-
- @PostConstruct
- @Override
- public void start() {
- Thread setupThread = new Thread("setup_solr_loglevel_filter_manager") {
- @Override
- public void run() {
- logger.info("Start initializing log level filter manager ...");
- if (logSearchConfigApiConfig.isSolrFilterStorage()) {
- while (true) {
- try {
- if (eventHistorySolrDao.getSolrCollectionState().isSolrCollectionReady()) {
- setLogLevelFilterManagerSolr(new LogLevelFilterManagerSolr(eventHistorySolrDao.getSolrClient()));
- solrLogLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
- logger.info("Log level filter manager successfully initialized.");
- break;
- }
- } catch (Exception ex) {
- logger.warn("Could not initialize log level Solr filter manager, going to sleep for " + RETRY_INTERVAL_SECONDS + " seconds ", ex);
- }
- try {
- Thread.sleep(RETRY_INTERVAL_SECONDS * 1000);
- } catch (Exception e) {/* ignore */}
- }
- } else {
- logger.info("Solr is not used as a log level filter storage.");
- }
- }
- };
- setupThread.setDaemon(true);
- setupThread.start();
- }
-
- public LogLevelFilterManagerSolr getLogLevelFilterManagerSolr() {
- return logLevelFilterManagerSolr;
- }
-
- public void setLogLevelFilterManagerSolr(final LogLevelFilterManagerSolr logLevelFilterManagerSolr) {
- this.logLevelFilterManagerSolr = logLevelFilterManagerSolr;
- }
-}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
index a7a955c..6119bb2 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
@@ -27,7 +27,7 @@ import org.apache.ambari.logfeeder.common.LogEntryParseTester;
import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer;
-import org.apache.ambari.logsearch.configurer.SolrLogLevelFilterConfigurer;
+import org.apache.ambari.logsearch.configurer.LogLevelManagerFilterConfigurer;
import org.apache.ambari.logsearch.model.common.LSServerInputConfig;
import org.apache.ambari.logsearch.model.common.LSServerLogLevelFilterMap;
import org.apache.log4j.Logger;
@@ -55,7 +55,7 @@ public class ShipperConfigManager extends JsonManagerBase {
private LogSearchConfigConfigurer logSearchConfigConfigurer;
@Inject
- private SolrLogLevelFilterConfigurer solrLogLevelFilterConfigurer;
+ private LogLevelManagerFilterConfigurer logLevelFilterConfigurer;
public List<String> getServices(String clusterName) {
return logSearchConfigConfigurer.getConfig().getServices(clusterName);
@@ -120,7 +120,9 @@ public class ShipperConfigManager extends JsonManagerBase {
public LSServerLogLevelFilterMap getLogLevelFilters(String clusterName) {
if (logSearchConfigApiConfig.isSolrFilterStorage()) {
- return new LSServerLogLevelFilterMap(solrLogLevelFilterConfigurer.getLogLevelFilterManagerSolr().getLogLevelFilters(clusterName));
+ return new LSServerLogLevelFilterMap(logLevelFilterConfigurer.getLogLevelFilterManagerSolr().getLogLevelFilters(clusterName));
+ } else if (logSearchConfigApiConfig.isZkFilterStorage()) {
+ return new LSServerLogLevelFilterMap(logLevelFilterConfigurer.getLogLevelFilterManagerZK().getLogLevelFilters(clusterName));
} else {
return new LSServerLogLevelFilterMap(logSearchConfigConfigurer.getConfig().getLogLevelFilterManager().getLogLevelFilters(clusterName));
}
@@ -129,7 +131,9 @@ public class ShipperConfigManager extends JsonManagerBase {
public Response setLogLevelFilters(String clusterName, LSServerLogLevelFilterMap request) {
try {
if (logSearchConfigApiConfig.isSolrFilterStorage()) {
- solrLogLevelFilterConfigurer.getLogLevelFilterManagerSolr().setLogLevelFilters(clusterName, request.convertToApi());
+ logLevelFilterConfigurer.getLogLevelFilterManagerSolr().setLogLevelFilters(clusterName, request.convertToApi());
+ } else if (logSearchConfigApiConfig.isZkFilterStorage()) {
+ logLevelFilterConfigurer.getLogLevelFilterManagerZK().setLogLevelFilters(clusterName, request.convertToApi());
} else {
logSearchConfigConfigurer.getConfig().getLogLevelFilterManager().setLogLevelFilters(clusterName, request.convertToApi());
}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
index d8b2ced..6a8e5eb 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logsearch.web.filters;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ambari.logsearch.common.MessageEnums;
import org.apache.ambari.logsearch.common.VResponse;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
import org.apache.ambari.logsearch.util.RESTErrorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,14 +44,14 @@ public class LogSearchLogLevelFilterManagerFilter implements Filter {
private static final String SOLR_FILTER_MANAGER_NOT_AVAILABLE = "Solr log level filter manager is not available";
private final RequestMatcher requestMatcher;
- private final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
+ private final LogLevelFilterManagerState logLevelFilterManagerState;
private final boolean enabled;
public LogSearchLogLevelFilterManagerFilter(RequestMatcher requestMatcher,
- SolrLogLevelFilterManagerState solrLogLevelFilterManagerState,
+ LogLevelFilterManagerState logLevelFilterManagerState,
boolean enabled) {
this.requestMatcher = requestMatcher;
- this.solrLogLevelFilterManagerState = solrLogLevelFilterManagerState;
+ this.logLevelFilterManagerState = logLevelFilterManagerState;
this.enabled = enabled;
}
@@ -84,7 +84,7 @@ public class LogSearchLogLevelFilterManagerFilter implements Filter {
}
private VResponse getErrorResponse() {
- if (!solrLogLevelFilterManagerState.isLogLevelFilterManagerIsReady()) {
+ if (!logLevelFilterManagerState.isLogLevelFilterManagerIsReady()) {
return RESTErrorUtil.createMessageResponse(SOLR_FILTER_MANAGER_NOT_AVAILABLE, MessageEnums.SOLR_CONFIGURATION_API_SOLR_NOT_AVAILEBLE);
}
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index bd59765..8371170 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -33,4 +33,5 @@ logfeeder.docker.registry.enabled=true
logfeeder.solr.core.config.name=history
#logfeeder.solr.urls=http://solr:8983/solr
#logfeeder.configs.local.enabled=true
-#logfeeder.configs.filter.solr.enabled=true
\ No newline at end of file
+#logfeeder.configs.filter.solr.enabled=true
+#logfeeder.configs.filter.zk.enabled=true
\ No newline at end of file
diff --git a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
index 21bc797..40c8cf5 100644
--- a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
+++ b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
@@ -20,6 +20,7 @@ logsearch.solr.collection.service.logs=hadoop_logs
#logsearch.config.api.filter.solr.enabled=true
#logsearch.config.api.enabled=false
+#logsearch.config.api.filter.zk.enabled=true
logsearch.service.logs.split.interval.mins=15
logsearch.collection.service.logs.numshards=3