You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/08/13 09:03:50 UTC

[ambari] branch branch-2.7 updated: AMBARI-24459. Log Search / Log Feeder - support to use ZK only for storing filters (without manage input configs) (#2041)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new a6c6a25  AMBARI-24459. Log Search / Log Feeder - support to use ZK only for storing filters (without manage input configs) (#2041)
a6c6a25 is described below

commit a6c6a25ffdd38c5927f491d82bfdf8eba2bc6925
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Aug 13 11:03:48 2018 +0200

    AMBARI-24459. Log Search / Log Feeder - support to use ZK only for storing filters (without manage input configs) (#2041)
---
 .../config/zookeeper/LogLevelFilterManagerZK.java  |  27 +++
 .../zookeeper/LogSearchConfigLogFeederZK.java      |  34 +---
 .../config/zookeeper/LogSearchConfigServerZK.java  |   5 +-
 .../config/zookeeper/LogSearchConfigZK.java        | 148 +---------------
 ...hConfigZK.java => LogSearchConfigZKHelper.java} | 187 +++++++++++++--------
 .../logfeeder/common/LogFeederConstants.java       |   3 +
 .../ambari/logfeeder/conf/ApplicationConfig.java   |  10 +-
 .../ambari/logfeeder/conf/LogFeederProps.java      |  18 ++
 .../loglevelfilter/LogLevelFilterHandler.java      |  23 ++-
 .../logsearch/common/ACLPropertiesSplitter.java    |  29 +---
 .../logsearch/conf/LogSearchConfigApiConfig.java   |  25 +++
 .../ambari/logsearch/conf/SecurityConfig.java      |  15 +-
 .../apache/ambari/logsearch/conf/SolrConfig.java   |   7 +-
 ...rState.java => LogLevelFilterManagerState.java} |   2 +-
 .../LogLevelManagerFilterConfigurer.java           | 123 ++++++++++++++
 .../configurer/SolrLogLevelFilterConfigurer.java   |  92 ----------
 .../logsearch/manager/ShipperConfigManager.java    |  12 +-
 .../LogSearchLogLevelFilterManagerFilter.java      |  10 +-
 .../test-config/logfeeder/logfeeder.properties     |   3 +-
 .../test-config/logsearch/logsearch.properties     |   1 +
 20 files changed, 381 insertions(+), 393 deletions(-)

diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index 1ad7517..e62ec1b 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -43,6 +43,22 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
   private final Gson gson;
   private final List<ACL> aclList;
 
+  public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception {
+    this.client = LogSearchConfigZKHelper.createZKClient(properties);
+    this.serverCache = new TreeCache(client, "/");
+    this.aclList = LogSearchConfigZKHelper.getAcls(properties);
+    this.gson = LogSearchConfigZKHelper.createGson();
+    this.serverCache.start();
+  }
+
+  public LogLevelFilterManagerZK(Map<String, String> properties, CuratorFramework client) throws Exception {
+    this.client = client;
+    this.serverCache = new TreeCache(client, "/");
+    this.aclList = LogSearchConfigZKHelper.getAcls(properties);
+    this.gson = LogSearchConfigZKHelper.createGson();
+    this.serverCache.start();
+  }
+
   public LogLevelFilterManagerZK(CuratorFramework client, TreeCache serverCache, List<ACL> aclList, Gson gson) {
     this.client = client;
     this.serverCache = serverCache;
@@ -91,4 +107,15 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
     return logLevelFilters;
   }
 
+  public CuratorFramework getClient() {
+    return client;
+  }
+
+  public TreeCache getServerCache() {
+    return serverCache;
+  }
+
+  public Gson getGson() {
+    return gson;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
index 8082ba7..0c565d3 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
@@ -24,7 +24,6 @@ import java.util.Set;
 
 import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.json.JsonHelper;
 import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
 import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
@@ -47,20 +46,14 @@ import com.google.gson.JsonParser;
 public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements LogSearchConfigLogFeeder {
   private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigLogFeederZK.class);
 
-  private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
-
   private TreeCache logFeederClusterCache;
 
   @Override
   public void init(Map<String, String> properties, String clusterName) throws Exception {
     super.init(properties);
-    while (client.checkExists().forPath("/") == null) {
-      LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
-      Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
-    }
-    
-    logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName));
-    LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, null, getAcls(), gson);
+    LogSearchConfigZKHelper.waitUntilRootAvailable(client);
+    logFeederClusterCache = LogSearchConfigZKHelper.createClusterCache(client, clusterName);
+    LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, null, LogSearchConfigZKHelper.getAcls(properties), gson);
     setLogLevelFilterManager(logLevelFilterManager);
   }
 
@@ -99,7 +92,7 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
         if (event.getData().getPath().startsWith(configPathStab + "input/")) {
           handleInputConfigChange(eventType, nodeName, nodeData);
         } else if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
-          handleLogLevelFilterChange(eventType, nodeName, nodeData);
+          LogSearchConfigZKHelper.handleLogLevelFilterChange(eventType, nodeName, nodeData, gson, logLevelFilterMonitor);
         }
       }
 
@@ -143,23 +136,6 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
           LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
         }
       }
-
-      private void handleLogLevelFilterChange(Type eventType, String nodeName, String nodeData) {
-        switch (eventType) {
-          case NODE_ADDED:
-          case NODE_UPDATED:
-            LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
-            LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
-            logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
-            break;
-          case NODE_REMOVED:
-            LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
-            logLevelFilterMonitor.removeLogLevelFilter(nodeName);
-            break;
-          default:
-            break;
-        }
-      }
     };
     logFeederClusterCache.getListenable().addListener(listener);
     logFeederClusterCache.start();
@@ -173,7 +149,7 @@ public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements Log
       if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
         client.setData().forPath(globalConfigNodePath, data.getBytes());
       } else {
-        client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes());
+        client.create().creatingParentContainersIfNeeded().withACL(LogSearchConfigZKHelper.getAcls(properties)).forPath(globalConfigNodePath, data.getBytes());
       }
     } catch (Exception e) {
       LOG.warn("Exception during global config node creation/update", e);
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
index d8fd79b..546e4d4 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
@@ -52,12 +52,9 @@ public class LogSearchConfigServerZK extends LogSearchConfigZK implements LogSea
     if (client.checkExists().forPath("/") == null) {
       client.create().creatingParentContainersIfNeeded().forPath("/");
     }
-    if (client.checkExists().forPath("/output") == null) {
-      client.create().creatingParentContainersIfNeeded().forPath("/output");
-    }
     serverCache = new TreeCache(client, "/");
     serverCache.start();
-    LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, serverCache, getAcls(), gson);
+    LogLevelFilterManager logLevelFilterManager = new LogLevelFilterManagerZK(client, serverCache, LogSearchConfigZKHelper.getAcls(properties), gson);
     setLogLevelFilterManager(logLevelFilterManager);
   }
 
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 7e51c35..dcbedd5 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -19,136 +19,37 @@
 
 package org.apache.ambari.logsearch.config.zookeeper;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.retry.RetryForever;
-import org.apache.curator.retry.RetryUntilElapsed;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Splitter;
 import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 
 public class LogSearchConfigZK implements LogSearchConfig {
   private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
 
-  private static final int DEFAULT_SESSION_TIMEOUT = 60000;
-  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
-  private static final int RETRY_INTERVAL_MS = 10000;
-  private static final String DEFAULT_ZK_ROOT = "/logsearch";
-  private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_connect_string",
-    description = "ZooKeeper connection string.",
-    examples = {"localhost1:2181,localhost2:2181/znode"},
-    sources = {"logsearch.properties", "logfeeder.properties"}
-  )
-  private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_acls",
-    description = "ZooKeeper ACLs for handling configs. (read & write)",
-    examples = {"world:anyone:r,sasl:solr:cdrwa,sasl:logsearch:cdrwa"},
-    sources = {"logsearch.properties", "logfeeder.properties"},
-    defaultValue = "world:anyone:cdrwa"
-  )
-  private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_root",
-    description = "ZooKeeper root node where the shippers are stored. (added to the connection string)",
-    examples = {"/logsearch"},
-    sources = {"logsearch.properties", "logfeeder.properties"}
-  )
-  private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_session_time_out_ms",
-    description = "ZooKeeper session timeout in milliseconds",
-    examples = {"60000"},
-    sources = {"logsearch.properties", "logfeeder.properties"}
-  )
-  private static final String ZK_SESSION_TIMEOUT_PROPERTY = "logsearch.config.zk_session_time_out_ms";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_connection_time_out_ms",
-    description = "ZooKeeper connection timeout in milliseconds",
-    examples = {"30000"},
-    sources = {"logsearch.properties", "logfeeder.properties"}
-  )
-  private static final String ZK_CONNECTION_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_time_out_ms";
-
-  @LogSearchPropertyDescription(
-    name = "logsearch.config.zk_connection_retry_time_out_ms",
-    description = "The maximum elapsed time for connecting to ZooKeeper in milliseconds. 0 means retrying forever.",
-    examples = {"1200000"},
-    sources = {"logsearch.properties", "logfeeder.properties"}
-  )
-  private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
-
   protected Map<String, String> properties;
   protected CuratorFramework client;
-  protected TreeCache outputCache;
   protected Gson gson;
   protected LogLevelFilterManager logLevelFilterManager;
 
   public void init(Map<String, String> properties) throws Exception {
     this.properties = properties;
-    
-    String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
-    LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
-    client = CuratorFrameworkFactory.builder()
-        .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
-        .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
-        .connectionTimeoutMs(getIntProperty(ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
-        .sessionTimeoutMs(getIntProperty(ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
-        .build();
+    client = LogSearchConfigZKHelper.createZKClient(properties);
     client.start();
-
-    outputCache = new TreeCache(client, "/output");
-    outputCache.start();
-
-    gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
-  }
-
-  private int getIntProperty(String propertyKey, int defaultValue) {
-    if (properties.get(propertyKey) == null)
-      return defaultValue;
-    return Integer.parseInt(properties.get(propertyKey));
-  }
-
-  private RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
-    if (zkConnectionRetryTimeoutValue == null)
-      return new RetryForever(RETRY_INTERVAL_MS);
-    int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
-    if (maxElapsedTimeMs == 0)
-      return new RetryForever(RETRY_INTERVAL_MS);
-    return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
+    gson = LogSearchConfigZKHelper.createGson();
   }
 
   @Override
   public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
     String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
     try {
-      client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
+      client.create().creatingParentContainersIfNeeded().withACL(LogSearchConfigZKHelper.getAcls(properties)).forPath(nodePath, inputConfig.getBytes());
       LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
     } catch (NodeExistsException e) {
       LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
@@ -165,49 +66,6 @@ public class LogSearchConfigZK implements LogSearchConfig {
     this.logLevelFilterManager = logLevelFilterManager;
   }
 
-  protected List<ACL> getAcls() {
-    String aclStr = properties.get(ZK_ACLS_PROPERTY);
-    if (StringUtils.isBlank(aclStr)) {
-      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
-    }
-
-    List<ACL> acls = new ArrayList<>();
-    List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
-    for (String unparcedAcl : aclStrList) {
-      String[] parts = unparcedAcl.split(":");
-      if (parts.length == 3) {
-        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
-      }
-    }
-    return acls;
-  }
-
-  private Integer parsePermission(String permission) {
-    int permissionCode = 0;
-    for (char each : permission.toLowerCase().toCharArray()) {
-      switch (each) {
-        case 'r':
-          permissionCode |= ZooDefs.Perms.READ;
-          break;
-        case 'w':
-          permissionCode |= ZooDefs.Perms.WRITE;
-          break;
-        case 'c':
-          permissionCode |= ZooDefs.Perms.CREATE;
-          break;
-        case 'd':
-          permissionCode |= ZooDefs.Perms.DELETE;
-          break;
-        case 'a':
-          permissionCode |= ZooDefs.Perms.ADMIN;
-          break;
-        default:
-          throw new IllegalArgumentException("Unsupported permission: " + permission);
-      }
-    }
-    return permissionCode;
-  }
-
   @Override
   public void close() {
     LOG.info("Closing ZooKeeper Connection");
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
similarity index 57%
copy from ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
copy to ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
index 7e51c35..b26181d 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZKHelper.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logsearch.config.zookeeper;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.commons.collections.MapUtils;
@@ -33,21 +31,28 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryUntilElapsed;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Splitter;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility functions for handling ZK operation and monitor ZK data for Log Search configuration
+ */
+public class LogSearchConfigZKHelper {
 
-public class LogSearchConfigZK implements LogSearchConfig {
-  private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZKHelper.class);
 
   private static final int DEFAULT_SESSION_TIMEOUT = 60000;
   private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
@@ -104,38 +109,52 @@ public class LogSearchConfigZK implements LogSearchConfig {
   )
   private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
 
-  protected Map<String, String> properties;
-  protected CuratorFramework client;
-  protected TreeCache outputCache;
-  protected Gson gson;
-  protected LogLevelFilterManager logLevelFilterManager;
+  private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
 
-  public void init(Map<String, String> properties) throws Exception {
-    this.properties = properties;
-    
+  private LogSearchConfigZKHelper() {
+  }
+
+  /**
+   * Create ZK curator client from a configuration (map holds the configs for that)
+   */
+  public static CuratorFramework createZKClient(Map<String, String> properties) {
     String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
     LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
-    client = CuratorFrameworkFactory.builder()
-        .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
-        .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
-        .connectionTimeoutMs(getIntProperty(ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
-        .sessionTimeoutMs(getIntProperty(ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
-        .build();
-    client.start();
-
-    outputCache = new TreeCache(client, "/output");
-    outputCache.start();
-
-    gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+    return CuratorFrameworkFactory.builder()
+      .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
+      .retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
+      .connectionTimeoutMs(getIntProperty(properties, ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
+      .sessionTimeoutMs(getIntProperty(properties, ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
+      .build();
   }
 
-  private int getIntProperty(String propertyKey, int defaultValue) {
+  /**
+   * Get ACLs from a property (get the value then parse and transform it as ACL objects)
+   */
+  public static List<ACL> getAcls(Map<String, String> properties) {
+    String aclStr = properties.get(ZK_ACLS_PROPERTY);
+    if (StringUtils.isBlank(aclStr)) {
+      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    }
+
+    List<ACL> acls = new ArrayList<>();
+    List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+    for (String unparcedAcl : aclStrList) {
+      String[] parts = unparcedAcl.split(":");
+      if (parts.length == 3) {
+        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+      }
+    }
+    return acls;
+  }
+
+  private static int getIntProperty(Map<String, String> properties, String propertyKey, int defaultValue) {
     if (properties.get(propertyKey) == null)
       return defaultValue;
     return Integer.parseInt(properties.get(propertyKey));
   }
 
-  private RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
+  private static RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
     if (zkConnectionRetryTimeoutValue == null)
       return new RetryForever(RETRY_INTERVAL_MS);
     int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
@@ -144,45 +163,76 @@ public class LogSearchConfigZK implements LogSearchConfig {
     return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
   }
 
-  @Override
-  public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
-    String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
-    try {
-      client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
-      LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
-    } catch (NodeExistsException e) {
-      LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
-    }
+  /**
+   * Create listener for znode of log level filters - can be used for Log Feeder as it can be useful if it's monitoring the log level changes
+   */
+  public static TreeCacheListener createTreeCacheListener(String clusterName, Gson gson, LogLevelFilterMonitor logLevelFilterMonitor) {
+    return new TreeCacheListener() {
+      private final Set<TreeCacheEvent.Type> nodeEvents = ImmutableSet.of(TreeCacheEvent.Type.NODE_ADDED, TreeCacheEvent.Type.NODE_UPDATED, TreeCacheEvent.Type.NODE_REMOVED);
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+        if (!nodeEvents.contains(event.getType())) {
+          return;
+        }
+        String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
+        String nodeData = new String(event.getData().getData());
+        TreeCacheEvent.Type eventType = event.getType();
+
+        String configPathStab = String.format("/%s/", clusterName);
+
+        if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
+          handleLogLevelFilterChange(eventType, nodeName, nodeData, gson, logLevelFilterMonitor);
+        }
+      }
+    };
   }
 
-  @Override
-  public LogLevelFilterManager getLogLevelFilterManager() {
-    return this.logLevelFilterManager;
+  /**
+   * Create root + cluster name znode cache
+   */
+  public static TreeCache createClusterCache(CuratorFramework client, String clusterName) {
+    return new TreeCache(client, String.format("/%s", clusterName));
   }
 
-  @Override
-  public void setLogLevelFilterManager(LogLevelFilterManager logLevelFilterManager) {
-    this.logLevelFilterManager = logLevelFilterManager;
+  /**
+   * Assign listener to cluster cache and start to use that listener
+   */
+  public static void addAndStartListenersOnCluster(TreeCache clusterCache, TreeCacheListener listener) throws Exception {
+    clusterCache.getListenable().addListener(listener);
+    clusterCache.start();
   }
 
-  protected List<ACL> getAcls() {
-    String aclStr = properties.get(ZK_ACLS_PROPERTY);
-    if (StringUtils.isBlank(aclStr)) {
-      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  public static void waitUntilRootAvailable(CuratorFramework client) throws Exception {
+    while (client.checkExists().forPath("/") == null) {
+      LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
+      Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
     }
+  }
 
-    List<ACL> acls = new ArrayList<>();
-    List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
-    for (String unparcedAcl : aclStrList) {
-      String[] parts = unparcedAcl.split(":");
-      if (parts.length == 3) {
-        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
-      }
+  /**
+   * Call log level filter monitor interface to handle node related operations (on update/remove)
+   */
+  public static void handleLogLevelFilterChange(final TreeCacheEvent.Type eventType, final String nodeName, final String nodeData,
+                                                final Gson gson, final LogLevelFilterMonitor logLevelFilterMonitor) {
+    switch (eventType) {
+      case NODE_ADDED:
+      case NODE_UPDATED:
+        LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
+        LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
+        logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
+        break;
+      case NODE_REMOVED:
+        LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
+        logLevelFilterMonitor.removeLogLevelFilter(nodeName);
+        break;
+      default:
+        break;
     }
-    return acls;
   }
 
-  private Integer parsePermission(String permission) {
+  /**
+   * Pares ZK ACL permission string and transform it to an integer
+   */
+  public static Integer parsePermission(String permission) {
     int permissionCode = 0;
     for (char each : permission.toLowerCase().toCharArray()) {
       switch (each) {
@@ -208,9 +258,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
     return permissionCode;
   }
 
-  @Override
-  public void close() {
-    LOG.info("Closing ZooKeeper Connection");
-    client.close();
+  public static Gson createGson() {
+    return new GsonBuilder().setDateFormat(DATE_FORMAT).create();
   }
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 251b4fc..1d56924 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -98,6 +98,9 @@ public class LogFeederConstants {
   public static final String USE_SOLR_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.solr.enabled";
   public static final boolean USE_SOLR_FILTER_STORAGE_DEFAULT = false;
 
+  public static final String USE_ZK_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.zk.enabled";
+  public static final boolean USE_ZK_FILTER_STORAGE_DEFAULT = false;
+
   public static final String MONITOR_SOLR_FILTER_STORAGE_PROPERTY = "logfeeder.configs.filter.solr.monitor.enabled";
   public static final boolean MONITOR_SOLR_FILTER_STORAGE_DEFAULT = true;
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index ccae373..f54fc0c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -38,6 +38,7 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.ambari.logsearch.config.local.LogSearchConfigLogFeederLocal;
 import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
 import org.apache.ambari.logsearch.config.solr.LogLevelFilterUpdaterSolr;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
 import org.apache.solr.client.solrj.SolrClient;
 import org.springframework.context.annotation.Bean;
@@ -47,6 +48,7 @@ import org.springframework.context.annotation.PropertySource;
 import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
 
 import javax.inject.Inject;
+import java.util.HashMap;
 
 @Configuration
 @PropertySource(value = {
@@ -92,11 +94,17 @@ public class ApplicationConfig {
   }
 
   @Bean
-  public LogLevelFilterManager logLevelFilterManager() {
+  public LogLevelFilterManager logLevelFilterManager() throws Exception {
     if (logFeederProps.isSolrFilterStorage()) {
       SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(
         logFeederProps.getSolrZkConnectString(), logFeederProps.getSolrUrls(), "history");
       return new LogLevelFilterManagerSolr(solrClient);
+    } else if (logFeederProps.isUseLocalConfigs() && logFeederProps.isZkFilterStorage()) {
+      final HashMap<String, String> map = new HashMap<>();
+      for (final String name : logFeederProps.getProperties().stringPropertyNames()) {
+        map.put(name, logFeederProps.getProperties().getProperty(name));
+      }
+      return new LogLevelFilterManagerZK(map);
     } else { // no default filter manager
       return null;
     }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 12408b4..859de8f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -162,6 +162,16 @@ public class LogFeederProps implements LogFeederProperties {
   public boolean solrFilterStorage;
 
   @LogSearchPropertyDescription(
+    name = LogFeederConstants.USE_ZK_FILTER_STORAGE_PROPERTY,
+    description = "Use zk as a log level filter storage (works only with local config)",
+    examples = {"true"},
+    defaultValue = LogFeederConstants.USE_ZK_FILTER_STORAGE_DEFAULT + "",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.USE_ZK_FILTER_STORAGE_PROPERTY + ":" + LogFeederConstants.USE_ZK_FILTER_STORAGE_DEFAULT +"}")
+  public boolean zkFilterStorage;
+
+  @LogSearchPropertyDescription(
     name = LogFeederConstants.MONITOR_SOLR_FILTER_STORAGE_PROPERTY,
     description = "Monitor log level filters (in solr) periodically - used for checking updates.",
     examples = {"false"},
@@ -334,6 +344,14 @@ public class LogFeederProps implements LogFeederProperties {
     this.solrUrlsStr = solrUrlsStr;
   }
 
+  public boolean isZkFilterStorage() {
+    return zkFilterStorage;
+  }
+
+  public void setZkFilterStorage(boolean zkFilterStorage) {
+    this.zkFilterStorage = zkFilterStorage;
+  }
+
   public String[] getSolrUrls() {
     if (StringUtils.isNotBlank(this.solrUrlsStr)) {
       return this.solrUrlsStr.split(",");
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
index 8c35d56..ab35f03 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.loglevelfilter;
 
+import com.google.gson.Gson;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -25,9 +26,14 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,13 +70,28 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
   private LogSearchConfig config;
   private Map<String, LogLevelFilter> filters = new ConcurrentHashMap<>();
 
+  // Use these 2 only if local config is used with zk log level filter storage
+  private TreeCache clusterCache = null;
+  private TreeCacheListener listener = null;
+
   public LogLevelFilterHandler(LogSearchConfig config) {
     this.config = config;
   }
 
   @PostConstruct
-  public void init() {
+  public void init() throws Exception {
     TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+    if (logFeederProps.isZkFilterStorage() && logFeederProps.isUseLocalConfigs()) {
+      LogLevelFilterManagerZK filterManager = (LogLevelFilterManagerZK) config.getLogLevelFilterManager();
+      CuratorFramework client = filterManager.getClient();
+      client.start();
+      Gson gson = filterManager.getGson();
+      LogSearchConfigZKHelper.waitUntilRootAvailable(client);
+      TreeCache clusterCache = LogSearchConfigZKHelper.createClusterCache(client, logFeederProps.getClusterName());
+      TreeCacheListener listener = LogSearchConfigZKHelper.createTreeCacheListener(
+        logFeederProps.getClusterName(), gson, this);
+      LogSearchConfigZKHelper.addAndStartListenersOnCluster(clusterCache, listener);
+    }
     if (config.getLogLevelFilterManager() != null) {
       TreeMap<String, LogLevelFilter> sortedFilters = config.getLogLevelFilterManager()
         .getLogLevelFilters(logFeederProps.getClusterName())
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
index 35a2b1b..10e9d10 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/common/ACLPropertiesSplitter.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logsearch.common;
 
 import com.google.common.base.Splitter;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -36,35 +37,9 @@ public class ACLPropertiesSplitter {
     for (String unparcedAcl : aclStrList) {
       String[] parts = unparcedAcl.split(":");
       if (parts.length == 3) {
-        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+        acls.add(new ACL(LogSearchConfigZKHelper.parsePermission(parts[2]), new Id(parts[0], parts[1])));
       }
     }
     return acls;
   }
-
-  private Integer parsePermission(String permission) {
-    int permissionCode = 0;
-    for (char each : permission.toLowerCase().toCharArray()) {
-      switch (each) {
-        case 'r':
-          permissionCode |= ZooDefs.Perms.READ;
-          break;
-        case 'w':
-          permissionCode |= ZooDefs.Perms.WRITE;
-          break;
-        case 'c':
-          permissionCode |= ZooDefs.Perms.CREATE;
-          break;
-        case 'd':
-          permissionCode |= ZooDefs.Perms.DELETE;
-          break;
-        case 'a':
-          permissionCode |= ZooDefs.Perms.ADMIN;
-          break;
-        default:
-          throw new IllegalArgumentException("Unsupported permission: " + permission);
-      }
-    }
-    return permissionCode;
-  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
index 7f3632d..2765ebd 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/LogSearchConfigApiConfig.java
@@ -18,8 +18,10 @@
  */
 package org.apache.ambari.logsearch.conf;
 
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import static org.apache.ambari.logsearch.common.LogSearchConstants.LOGSEARCH_PROPERTIES_FILE;
@@ -47,6 +49,21 @@ public class LogSearchConfigApiConfig {
   @Value("${logsearch.config.api.filter.solr.enabled:false}")
   public boolean solrFilterStorage;
 
+  @LogSearchPropertyDescription(
+    name = "logsearch.config.api.filter.zk-only.enabled",
+    description = "Use zookeeper as a log level filter storage",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LOGSEARCH_PROPERTIES_FILE}
+  )
+  @Value("${logsearch.config.api.filter.zk.enabled:false}")
+  public boolean zkFilterStorage;
+
+  @Bean(name = "logLevelFilterManagerState")
+  public LogLevelFilterManagerState logLevelFilterManagerState() {
+    return new LogLevelFilterManagerState();
+  }
+
   public boolean isConfigApiEnabled() {
     return configApiEnabled;
   }
@@ -62,4 +79,12 @@ public class LogSearchConfigApiConfig {
   public void setSolrFilterStorage(boolean solrFilterStorage) {
     this.solrFilterStorage = solrFilterStorage;
   }
+
+  public boolean isZkFilterStorage() {
+    return zkFilterStorage;
+  }
+
+  public void setZkFilterStorage(boolean zkFilterStorage) {
+    this.zkFilterStorage = zkFilterStorage;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
index 69d63d2..0bc1519 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SecurityConfig.java
@@ -22,8 +22,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.ambari.logsearch.conf.global.LogSearchConfigState;
 import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
 import org.apache.ambari.logsearch.web.authenticate.LogsearchAuthFailureHandler;
 import org.apache.ambari.logsearch.web.authenticate.LogsearchAuthSuccessHandler;
 import org.apache.ambari.logsearch.web.authenticate.LogsearchLogoutSuccessHandler;
@@ -44,7 +43,6 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.security.config.annotation.web.builders.HttpSecurity;
 import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
 import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
-import org.springframework.security.config.http.SessionCreationPolicy;
 import org.springframework.security.web.access.intercept.FilterSecurityInterceptor;
 import org.springframework.security.web.authentication.www.BasicAuthenticationFilter;
 import org.springframework.security.web.util.matcher.AntPathRequestMatcher;
@@ -90,8 +88,8 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
   private SolrCollectionState solrEventHistoryState;
 
   @Inject
-  @Named("solrLogLevelFilterManagerState")
-  private SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
+  @Named("logLevelFilterManagerState")
+  private LogLevelFilterManagerState logLevelFilterManagerState;
 
   @Inject
   private LogSearchConfigState logSearchConfigState;
@@ -190,7 +188,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
 
   @Bean
   public LogSearchConfigStateFilter logSearchConfigStateFilter() {
-    if (logSearchConfigApiConfig.isSolrFilterStorage()) {
+    if (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage()) {
       return new LogSearchConfigStateFilter(shipperConfigInputRequestMatcher(), logSearchConfigState, logSearchConfigApiConfig.isConfigApiEnabled());
     } else {
       return new LogSearchConfigStateFilter(logsearchConfigRequestMatcher(), logSearchConfigState, logSearchConfigApiConfig.isConfigApiEnabled());
@@ -199,8 +197,9 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
 
   @Bean
   public LogSearchLogLevelFilterManagerFilter logSearchLogLevelFilterManagerFilter() {
-    boolean enabled = logSearchConfigApiConfig.isSolrFilterStorage() && !logSearchConfigApiConfig.isConfigApiEnabled();
-    return new LogSearchLogLevelFilterManagerFilter(logLevelFilterRequestMatcher(), solrLogLevelFilterManagerState, enabled);
+    boolean enabled = (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage())
+      && !logSearchConfigApiConfig.isConfigApiEnabled();
+    return new LogSearchLogLevelFilterManagerFilter(logLevelFilterRequestMatcher(), logLevelFilterManagerState, enabled);
   }
 
   @Bean
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
index 7577d6e..33f4f6f 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrConfig.java
@@ -20,7 +20,7 @@ package org.apache.ambari.logsearch.conf;
 
 import org.apache.ambari.logsearch.conf.global.SolrAuditLogsState;
 import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
 import org.apache.ambari.logsearch.conf.global.SolrServiceLogsState;
 import org.apache.ambari.logsearch.conf.global.SolrEventHistoryState;
 import org.apache.ambari.logsearch.dao.SolrSchemaFieldDao;
@@ -54,11 +54,6 @@ public class SolrConfig {
     return new SolrEventHistoryState();
   }
 
-  @Bean(name = "solrLogLevelFilterManagerState")
-  public SolrLogLevelFilterManagerState solrLogLevelFilterManagerState() {
-    return new SolrLogLevelFilterManagerState();
-  }
-
   @Bean
   public SolrClientsHolder solrClientsHolder() {
     return new SolrClientsHolder();
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
similarity index 96%
rename from ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java
rename to ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
index d610e10..afd5313 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/SolrLogLevelFilterManagerState.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/global/LogLevelFilterManagerState.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logsearch.conf.global;
 import javax.inject.Named;
 
 @Named
-public class SolrLogLevelFilterManagerState {
+public class LogLevelFilterManagerState {
 
   private volatile boolean logLevelFilterManagerIsReady;
 
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java
new file mode 100644
index 0000000..5efca85
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogLevelManagerFilterConfigurer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.configurer;
+
+import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
+import org.apache.ambari.logsearch.conf.LogSearchConfigMapHolder;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
+import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
+import org.apache.ambari.logsearch.config.zookeeper.LogLevelFilterManagerZK;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZKHelper;
+import org.apache.ambari.logsearch.dao.EventHistorySolrDao;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class LogLevelManagerFilterConfigurer implements Configurer {
+  private static final Logger logger = LoggerFactory.getLogger(LogLevelManagerFilterConfigurer.class);
+
+  private static final int RETRY_INTERVAL_SECONDS = 10;
+
+  private final EventHistorySolrDao eventHistorySolrDao;
+  private final LogLevelFilterManagerState logLevelFilterManagerState;
+  private final LogSearchConfigApiConfig logSearchConfigApiConfig;
+  private final LogSearchConfigMapHolder logSearchConfigMapHolder;
+
+  private LogLevelFilterManagerSolr logLevelFilterManagerSolr;
+  private LogLevelFilterManagerZK logLevelFilterManagerZK;
+
+  @Inject
+  public LogLevelManagerFilterConfigurer(final LogSearchConfigApiConfig logSearchConfigApiConfig,
+                                         final LogLevelFilterManagerState logLevelFilterManagerState,
+                                         final EventHistorySolrDao eventHistorySolrDao,
+                                         final LogSearchConfigMapHolder logSearchConfigMapHolder) {
+    this.logSearchConfigApiConfig = logSearchConfigApiConfig;
+    this.logLevelFilterManagerState = logLevelFilterManagerState;
+    this.eventHistorySolrDao = eventHistorySolrDao;
+    this.logSearchConfigMapHolder = logSearchConfigMapHolder;
+  }
+
+  @PostConstruct
+  @Override
+  public void start() {
+    Thread setupThread = new Thread("setup_solr_loglevel_filter_manager") {
+      @Override
+      public void run() {
+        logger.info("Start initializing log level filter manager ...");
+        if (logSearchConfigApiConfig.isSolrFilterStorage() || logSearchConfigApiConfig.isZkFilterStorage()) {
+          while (true) {
+            try {
+              if (logSearchConfigApiConfig.isSolrFilterStorage()) {
+                if (eventHistorySolrDao.getSolrCollectionState().isSolrCollectionReady()) {
+                  setLogLevelFilterManagerSolr(new LogLevelFilterManagerSolr(eventHistorySolrDao.getSolrClient()));
+                  logLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
+                  logger.info("Log level filter manager (solr) successfully initialized.");
+                  break;
+                }
+              }
+              if (logSearchConfigApiConfig.isZkFilterStorage()) {
+                CuratorFramework client = LogSearchConfigZKHelper.createZKClient(logSearchConfigMapHolder.getLogsearchProperties());
+                client.start();
+                if (client.checkExists().forPath("/") == null) {
+                  client.create().creatingParentContainersIfNeeded().forPath("/");
+                }
+                LogLevelFilterManagerZK logLevelFilterManagerZK = new LogLevelFilterManagerZK(
+                  logSearchConfigMapHolder.getLogsearchProperties(), client);
+                setLogLevelFilterManagerZK(logLevelFilterManagerZK);
+                logLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
+                logger.info("Log level filter manager (zookeeper) successfully initialized.");
+                break;
+              }
+            } catch (Exception ex) {
+              logger.warn("Could not initialize log level Solr filter manager, going to sleep for " + RETRY_INTERVAL_SECONDS + " seconds ", ex);
+            }
+            try {
+              Thread.sleep(RETRY_INTERVAL_SECONDS * 1000);
+            } catch (Exception e) {/* ignore */}
+          }
+        } else {
+          logger.info("Solr is not used as a log level filter storage.");
+        }
+      }
+    };
+    setupThread.setDaemon(true);
+    setupThread.start();
+  }
+
+  public LogLevelFilterManagerSolr getLogLevelFilterManagerSolr() {
+    return logLevelFilterManagerSolr;
+  }
+
+  public void setLogLevelFilterManagerSolr(final LogLevelFilterManagerSolr logLevelFilterManagerSolr) {
+    this.logLevelFilterManagerSolr = logLevelFilterManagerSolr;
+  }
+
+  public LogLevelFilterManagerZK getLogLevelFilterManagerZK() {
+    return logLevelFilterManagerZK;
+  }
+
+  public void setLogLevelFilterManagerZK(LogLevelFilterManagerZK logLevelFilterManagerZK) {
+    this.logLevelFilterManagerZK = logLevelFilterManagerZK;
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java
deleted file mode 100644
index e62c6c4..0000000
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrLogLevelFilterConfigurer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logsearch.configurer;
-
-import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
-import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr;
-import org.apache.ambari.logsearch.dao.EventHistorySolrDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import javax.inject.Named;
-
-@Named
-public class SolrLogLevelFilterConfigurer implements Configurer {
-  private static final Logger logger = LoggerFactory.getLogger(SolrLogLevelFilterConfigurer.class);
-
-  private static final int RETRY_INTERVAL_SECONDS = 10;
-
-  private final EventHistorySolrDao eventHistorySolrDao;
-  private final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
-  private final LogSearchConfigApiConfig logSearchConfigApiConfig;
-
-  private LogLevelFilterManagerSolr logLevelFilterManagerSolr;
-
-  @Inject
-  public SolrLogLevelFilterConfigurer(final LogSearchConfigApiConfig logSearchConfigApiConfig,
-                                      final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState,
-                                      final EventHistorySolrDao eventHistorySolrDao) {
-    this.logSearchConfigApiConfig = logSearchConfigApiConfig;
-    this.solrLogLevelFilterManagerState = solrLogLevelFilterManagerState;
-    this.eventHistorySolrDao = eventHistorySolrDao;
-  }
-
-  @PostConstruct
-  @Override
-  public void start() {
-    Thread setupThread = new Thread("setup_solr_loglevel_filter_manager") {
-      @Override
-      public void run() {
-        logger.info("Start initializing log level filter manager ...");
-        if (logSearchConfigApiConfig.isSolrFilterStorage()) {
-          while (true) {
-            try {
-              if (eventHistorySolrDao.getSolrCollectionState().isSolrCollectionReady()) {
-                setLogLevelFilterManagerSolr(new LogLevelFilterManagerSolr(eventHistorySolrDao.getSolrClient()));
-                solrLogLevelFilterManagerState.setLogLevelFilterManagerIsReady(true);
-                logger.info("Log level filter manager successfully initialized.");
-                break;
-              }
-            } catch (Exception ex) {
-              logger.warn("Could not initialize log level Solr filter manager, going to sleep for " + RETRY_INTERVAL_SECONDS + " seconds ", ex);
-            }
-            try {
-              Thread.sleep(RETRY_INTERVAL_SECONDS * 1000);
-            } catch (Exception e) {/* ignore */}
-          }
-        } else {
-          logger.info("Solr is not used as a log level filter storage.");
-        }
-      }
-    };
-    setupThread.setDaemon(true);
-    setupThread.start();
-  }
-
-  public LogLevelFilterManagerSolr getLogLevelFilterManagerSolr() {
-    return logLevelFilterManagerSolr;
-  }
-
-  public void setLogLevelFilterManagerSolr(final LogLevelFilterManagerSolr logLevelFilterManagerSolr) {
-    this.logLevelFilterManagerSolr = logLevelFilterManagerSolr;
-  }
-}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
index a7a955c..6119bb2 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
@@ -27,7 +27,7 @@ import org.apache.ambari.logfeeder.common.LogEntryParseTester;
 import org.apache.ambari.logsearch.conf.LogSearchConfigApiConfig;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer;
-import org.apache.ambari.logsearch.configurer.SolrLogLevelFilterConfigurer;
+import org.apache.ambari.logsearch.configurer.LogLevelManagerFilterConfigurer;
 import org.apache.ambari.logsearch.model.common.LSServerInputConfig;
 import org.apache.ambari.logsearch.model.common.LSServerLogLevelFilterMap;
 import org.apache.log4j.Logger;
@@ -55,7 +55,7 @@ public class ShipperConfigManager extends JsonManagerBase {
   private LogSearchConfigConfigurer logSearchConfigConfigurer;
 
   @Inject
-  private SolrLogLevelFilterConfigurer solrLogLevelFilterConfigurer;
+  private LogLevelManagerFilterConfigurer logLevelFilterConfigurer;
   
   public List<String> getServices(String clusterName) {
     return logSearchConfigConfigurer.getConfig().getServices(clusterName);
@@ -120,7 +120,9 @@ public class ShipperConfigManager extends JsonManagerBase {
 
   public LSServerLogLevelFilterMap getLogLevelFilters(String clusterName) {
     if (logSearchConfigApiConfig.isSolrFilterStorage()) {
-      return new LSServerLogLevelFilterMap(solrLogLevelFilterConfigurer.getLogLevelFilterManagerSolr().getLogLevelFilters(clusterName));
+      return new LSServerLogLevelFilterMap(logLevelFilterConfigurer.getLogLevelFilterManagerSolr().getLogLevelFilters(clusterName));
+    } else if (logSearchConfigApiConfig.isZkFilterStorage()) {
+      return new LSServerLogLevelFilterMap(logLevelFilterConfigurer.getLogLevelFilterManagerZK().getLogLevelFilters(clusterName));
     } else {
       return new LSServerLogLevelFilterMap(logSearchConfigConfigurer.getConfig().getLogLevelFilterManager().getLogLevelFilters(clusterName));
     }
@@ -129,7 +131,9 @@ public class ShipperConfigManager extends JsonManagerBase {
   public Response setLogLevelFilters(String clusterName, LSServerLogLevelFilterMap request) {
     try {
       if (logSearchConfigApiConfig.isSolrFilterStorage()) {
-        solrLogLevelFilterConfigurer.getLogLevelFilterManagerSolr().setLogLevelFilters(clusterName, request.convertToApi());
+        logLevelFilterConfigurer.getLogLevelFilterManagerSolr().setLogLevelFilters(clusterName, request.convertToApi());
+      } else if (logSearchConfigApiConfig.isZkFilterStorage()) {
+        logLevelFilterConfigurer.getLogLevelFilterManagerZK().setLogLevelFilters(clusterName, request.convertToApi());
       } else {
         logSearchConfigConfigurer.getConfig().getLogLevelFilterManager().setLogLevelFilters(clusterName, request.convertToApi());
       }
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
index d8b2ced..6a8e5eb 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/web/filters/LogSearchLogLevelFilterManagerFilter.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logsearch.web.filters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.ambari.logsearch.common.MessageEnums;
 import org.apache.ambari.logsearch.common.VResponse;
-import org.apache.ambari.logsearch.conf.global.SolrLogLevelFilterManagerState;
+import org.apache.ambari.logsearch.conf.global.LogLevelFilterManagerState;
 import org.apache.ambari.logsearch.util.RESTErrorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,14 +44,14 @@ public class LogSearchLogLevelFilterManagerFilter implements Filter {
   private static final String SOLR_FILTER_MANAGER_NOT_AVAILABLE = "Solr log level filter manager is not available";
 
   private final RequestMatcher requestMatcher;
-  private final SolrLogLevelFilterManagerState solrLogLevelFilterManagerState;
+  private final LogLevelFilterManagerState logLevelFilterManagerState;
   private final boolean enabled;
 
   public LogSearchLogLevelFilterManagerFilter(RequestMatcher requestMatcher,
-                                              SolrLogLevelFilterManagerState solrLogLevelFilterManagerState,
+                                              LogLevelFilterManagerState logLevelFilterManagerState,
                                               boolean enabled) {
     this.requestMatcher = requestMatcher;
-    this.solrLogLevelFilterManagerState = solrLogLevelFilterManagerState;
+    this.logLevelFilterManagerState = logLevelFilterManagerState;
     this.enabled = enabled;
   }
 
@@ -84,7 +84,7 @@ public class LogSearchLogLevelFilterManagerFilter implements Filter {
   }
 
   private VResponse getErrorResponse() {
-    if (!solrLogLevelFilterManagerState.isLogLevelFilterManagerIsReady()) {
+    if (!logLevelFilterManagerState.isLogLevelFilterManagerIsReady()) {
       return RESTErrorUtil.createMessageResponse(SOLR_FILTER_MANAGER_NOT_AVAILABLE, MessageEnums.SOLR_CONFIGURATION_API_SOLR_NOT_AVAILEBLE);
     }
 
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index bd59765..8371170 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -33,4 +33,5 @@ logfeeder.docker.registry.enabled=true
 logfeeder.solr.core.config.name=history
 #logfeeder.solr.urls=http://solr:8983/solr
 #logfeeder.configs.local.enabled=true
-#logfeeder.configs.filter.solr.enabled=true
\ No newline at end of file
+#logfeeder.configs.filter.solr.enabled=true
+#logfeeder.configs.filter.zk.enabled=true
\ No newline at end of file
diff --git a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
index 21bc797..40c8cf5 100644
--- a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
+++ b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
@@ -20,6 +20,7 @@ logsearch.solr.collection.service.logs=hadoop_logs
 
 #logsearch.config.api.filter.solr.enabled=true
 #logsearch.config.api.enabled=false
+#logsearch.config.api.filter.zk.enabled=true
 
 logsearch.service.logs.split.interval.mins=15
 logsearch.collection.service.logs.numshards=3