You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 23:38:38 UTC

[48/50] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
index c57c028..5fc2e14 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -27,17 +27,15 @@ import org.apache.log4j.Logger;
 
 public enum LogsearchReaderFactory {
   INSTANCE;
-  private static Logger logger = Logger
-    .getLogger(LogsearchReaderFactory.class);
+  private static final Logger LOG = Logger.getLogger(LogsearchReaderFactory.class);
 
   public Reader getReader(File file) throws FileNotFoundException {
-    logger.debug("Inside reader factory for file:" + file);
+    LOG.debug("Inside reader factory for file:" + file);
     if (GZIPReader.isValidFile(file.getAbsolutePath())) {
-      logger.info("Reading file " + file + " as gzip file");
+      LOG.info("Reading file " + file + " as gzip file");
       return new GZIPReader(file.getAbsolutePath());
     } else {
       return new FileReader(file);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
deleted file mode 100644
index ae0cfc0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.SolrUtil;
-import org.apache.ambari.logfeeder.view.VLogfeederFilter;
-import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;
-import org.apache.log4j.Logger;
-
-public class FetchConfigFromSolr extends Thread {
-  private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class);
-  private static VLogfeederFilterWrapper logfeederFilterWrapper = null;
-  private static int solrConfigInterval = 5;// 5 sec;
-  private static long delay;
-  private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z
-  private static String sysTimeZone = "GMT";
-
-  FetchConfigFromSolr(boolean isDaemon) {
-    this.setName(this.getClass().getSimpleName());
-    this.setDaemon(isDaemon);
-  }
-
-  @Override
-  public void run() {
-    String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
-    String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
-    if ((zkConnectString == null || zkConnectString.trim().length() == 0 )
-        && (solrUrl == null || solrUrl.trim().length() == 0)) {
-      logger.warn("Neither Solr ZK Connect String nor solr Uril for UserConfig/History is set." +
-          "Won't look for level configuration from Solr.");
-      return;
-    }
-    solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", solrConfigInterval);
-    delay = 1000 * solrConfigInterval;
-    do {
-      logger.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
-      pullConfigFromSolr();
-      try {
-        Thread.sleep(delay);
-      } catch (InterruptedException e) {
-        logger.error(e.getLocalizedMessage(), e.getCause());
-      }
-    } while (true);
-  }
-
-  private synchronized void pullConfigFromSolr() {
-    SolrUtil solrUtil = SolrUtil.getInstance();
-    if(solrUtil!=null){
-      HashMap<String, Object> configDocMap = solrUtil.getConfigDoc();
-      if (configDocMap != null) {
-        String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
-        if (configJson != null) {
-          logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class);
-        }
-      }
-    }
-  }
-
-  private static boolean isFilterExpired(VLogfeederFilter logfeederFilter) {
-    boolean isFilterExpired = false;// default is false
-    if (logfeederFilter != null) {
-      Date filterEndDate = parseFilterExpireDate(logfeederFilter);
-      if (filterEndDate != null) {
-        Date currentDate = getCurrentDate();
-        if (currentDate.compareTo(filterEndDate) >= 0) {
-          logger.debug("Filter for  Component :" + logfeederFilter.getLabel() + " and Hosts :"
-            + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : "
-            + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate));
-          isFilterExpired = true;
-        }
-      }
-    }
-    return isFilterExpired;
-  }
-
-  private static String dateToStr(Date date) {
-    if (date == null) {
-      return "";
-    }
-    SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
-    TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
-    formatter.setTimeZone(timeZone);
-    return formatter.format(date);
-  }
-
-  private static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) {
-    String expiryTime = vLogfeederFilter.getExpiryTime();
-    if (expiryTime != null && !expiryTime.isEmpty()) {
-      SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
-      TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
-      formatter.setTimeZone(timeZone);
-      try {
-        return formatter.parse(expiryTime);
-      } catch (ParseException e) {
-        logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
-          + " and hosts :" + listToStr(vLogfeederFilter.getHosts()));
-      }
-    }
-    return null;
-  }
-
-  public static List<String> getAllowedLevels(String hostName, VLogfeederFilter componentFilter) {
-    String componentName = componentFilter.getLabel();
-    List<String> hosts = componentFilter.getHosts();
-    List<String> defaultLevels = componentFilter.getDefaultLevels();
-    List<String> overrideLevels = componentFilter.getOverrideLevels();
-    String expiryTime=componentFilter.getExpiryTime();
-    //check is user override or not
-    if ((expiryTime != null && !expiryTime.isEmpty())
-        || (overrideLevels != null && !overrideLevels.isEmpty())
-        || (hosts != null && !hosts.isEmpty())) {
-      if (hosts == null || hosts.isEmpty()) {
-        // hosts list is empty or null consider it apply on all hosts
-        hosts.add(LogFeederConstants.ALL);
-      }
-      if (LogFeederUtil.isListContains(hosts, hostName, false)) {
-        if (isFilterExpired(componentFilter)) {
-          logger.debug("Filter for component " + componentName + " and host :"
-              + hostName + " is expired at " + componentFilter.getExpiryTime());
-          return defaultLevels;
-        } else {
-          return overrideLevels;
-        }
-      }
-    }
-    return defaultLevels;
-  }
-
-  public static boolean isFilterAvailable() {
-    return logfeederFilterWrapper != null;
-  }
-  
-  public static VLogfeederFilter findComponentFilter(String componentName) {
-    if (logfeederFilterWrapper != null) {
-      HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
-      if (filter != null) {
-        VLogfeederFilter componentFilter = filter.get(componentName);
-        if (componentFilter != null) {
-          return componentFilter;
-        }
-      }
-    }
-    logger.trace("Filter is not there for component :" + componentName);
-    return null;
-  }
-
-
-  public static Date getCurrentDate() {
-    TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone));
-    Date date = new Date();
-    return date;
-  }
-
-  public static String listToStr(List<String> strList) {
-    StringBuilder out = new StringBuilder("[");
-    if (strList != null) {
-      int counter = 0;
-      for (Object o : strList) {
-        if (counter > 0) {
-          out.append(",");
-        }
-        out.append(o.toString());
-        counter++;
-      }
-    }
-    out.append("]");
-    return out.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
new file mode 100644
index 0000000..801a289
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Read configuration from solr and filter the log
+ */
+public enum FilterLogData {
+  INSTANCE;
+  
+  private static final Logger LOG = Logger.getLogger(FilterLogData.class);
+  
+  private static final boolean DEFAULT_VALUE = true;
+
+  public boolean isAllowed(String jsonBlock) {
+    if (StringUtils.isEmpty(jsonBlock)) {
+      return DEFAULT_VALUE;
+    }
+    Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+    return isAllowed(jsonObj);
+  }
+
+  public boolean isAllowed(Map<String, Object> jsonObj) {
+    boolean isAllowed = applyFilter(jsonObj);
+    if (!isAllowed) {
+      LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
+    }
+    return isAllowed;
+  }
+  
+
+  private boolean applyFilter(Map<String, Object> jsonObj) {
+    if (MapUtils.isEmpty(jsonObj)) {
+      LOG.warn("Output jsonobj is empty");
+      return DEFAULT_VALUE;
+    }
+    
+    String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+    String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+    String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+    if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) {
+      LogFeederFilter componentFilter = LogConfigHandler.findComponentFilter(componentName);
+      if (componentFilter == null) {
+        return DEFAULT_VALUE;
+      }
+      List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, componentFilter);
+      if (CollectionUtils.isEmpty(allowedLevels)) {
+        allowedLevels.add(LogFeederConstants.ALL);
+      }
+      return LogFeederUtil.isListContains(allowedLevels, level, false);
+    }
+    else {
+      return DEFAULT_VALUE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
new file mode 100644
index 0000000..12c744c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+
+public class LogConfigFetcher {
+  private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class);
+  
+  private static LogConfigFetcher instance;
+  public synchronized static LogConfigFetcher getInstance() {
+    if (instance == null) {
+      try {
+        instance = new LogConfigFetcher();
+      } catch (Exception e) {
+        String logMessageKey = LogConfigFetcher.class.getSimpleName() + "_SOLR_UTIL";
+              LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error constructing solrUtil", e, LOG, Level.WARN);
+      }
+    }
+    return instance;
+  }
+
+  private SolrClient solrClient;
+
+  private String solrDetail = "";
+
+  public LogConfigFetcher() throws Exception {
+    String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
+    String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
+    String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
+    connectToSolr(url, zkConnectString, collection);
+  }
+
+  private SolrClient connectToSolr(String url, String zkConnectString, String collection) throws Exception {
+    solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection + ", url=" + url;
+
+    LOG.info("connectToSolr() " + solrDetail);
+    if (StringUtils.isEmpty(collection)) {
+      throw new Exception("For solr, collection name is mandatory. " + solrDetail);
+    }
+    
+    if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url))
+      throw new Exception("Both zkConnectString and URL are empty. zkConnectString=" + zkConnectString + ", collection=" +
+          collection + ", url=" + url);
+    
+    if (StringUtils.isNotEmpty(zkConnectString)) {
+      solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection;
+      LOG.info("Using zookeepr. " + solrDetail);
+      CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString);
+      solrClouldClient.setDefaultCollection(collection);
+      solrClient = solrClouldClient;
+      checkSolrStatus(3 * 60 * 1000);
+    } else {
+      solrDetail = "collection=" + collection + ", url=" + url;
+      String collectionURL = url + "/" + collection;
+      LOG.info("Connecting to  solr : " + collectionURL);
+      solrClient = new HttpSolrClient(collectionURL);
+    }
+    return solrClient;
+  }
+
+  private boolean checkSolrStatus(int waitDurationMS) {
+    boolean status = false;
+    try {
+      long beginTimeMS = System.currentTimeMillis();
+      long waitIntervalMS = 2000;
+      int pingCount = 0;
+      while (true) {
+        pingCount++;
+        CollectionAdminResponse response = null;
+        try {
+          CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
+          response = colListReq.process(solrClient);
+        } catch (Exception ex) {
+          LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
+        }
+        if (response != null && response.getStatus() == 0) {
+          LOG.info("Solr getCollections() is success. solr=" + solrDetail);
+          status = true;
+          break;
+        }
+        if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
+          LOG.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS)
+            + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
+            + solrDetail + ", response=" + response);
+          break;
+        } else {
+          LOG.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount + ". Will sleep for " +
+              waitIntervalMS + " ms and try again." + " solr=" + solrDetail + ", response=" + response);
+        }
+        Thread.sleep(waitIntervalMS);
+      }
+    } catch (Throwable t) {
+      LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t);
+    }
+    return status;
+  }
+
+  public Map<String, Object> getConfigDoc() {
+    HashMap<String, Object> configMap = new HashMap<String, Object>();
+    SolrQuery solrQuery = new SolrQuery();
+    solrQuery.setQuery("*:*");
+    String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
+    solrQuery.setFilterQueries(fq);
+    try {
+      QueryResponse response = process(solrQuery);
+      if (response != null) {
+        SolrDocumentList documentList = response.getResults();
+        if (CollectionUtils.isNotEmpty(documentList)) {
+          SolrDocument configDoc = documentList.get(0);
+          String configJson = LogFeederUtil.getGson().toJson(configDoc);
+          configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson);
+        }
+      }
+    } catch (Exception e) {
+      String logMessageKey = this.getClass().getSimpleName() + "_FETCH_FILTER_CONFIG_ERROR";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting filter config from solr", e, LOG, Level.ERROR);
+    }
+    return configMap;
+  }
+
+  private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
+    if (solrClient != null) {
+      QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
+      return queryResponse;
+    } else {
+      LOG.error("solrClient can't be null");
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
new file mode 100644
index 0000000..4f52b0b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+public class LogConfigHandler extends Thread {
+  private static final Logger LOG = Logger.getLogger(LogConfigHandler.class);
+  
+  private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5;
+  private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+  private static final String TIMEZONE = "GMT";
+  
+  static {
+    TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+  }
+  
+  private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
+    protected DateFormat initialValue() {
+      SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+      dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
+      return dateFormat;
+    }
+  };
+  
+  private static LogFeederFilterWrapper logFeederFilterWrapper;
+
+  private static boolean running = false;
+
+  public static void handleConfig() {
+    boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+    if (!filterEnable) {
+      LOG.info("Logfeeder filter Scheduler is disabled.");
+      return;
+    }
+    if (!running) {
+      new LogConfigHandler().start();
+      running = true;
+      LOG.info("Logfeeder Filter Thread started!");
+    } else {
+      LOG.warn("Logfeeder Filter Thread is already running.");
+    }
+  }
+  
+  private LogConfigHandler() {
+    setName(getClass().getSimpleName());
+    setDaemon(true);
+  }
+
+  @Override
+  public void run() {
+    String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
+    String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
+    if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) {
+      LOG.warn("Neither Solr ZK Connect String nor solr Url for UserConfig/History is set." +
+          "Won't look for level configuration from Solr.");
+      return;
+    }
+    
+    int solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", DEFAULT_SOLR_CONFIG_INTERVAL);
+    do {
+      LOG.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
+      fetchConfig();
+      try {
+        Thread.sleep(1000 * solrConfigInterval);
+      } catch (InterruptedException e) {
+        LOG.error(e.getLocalizedMessage(), e.getCause());
+      }
+    } while (true);
+  }
+
+  private synchronized void fetchConfig() {
+    LogConfigFetcher fetcher = LogConfigFetcher.getInstance();
+    if (fetcher != null) {
+      Map<String, Object> configDocMap = fetcher.getConfigDoc();
+      String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
+      if (configJson != null) {
+        logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, LogFeederFilterWrapper.class);
+      }
+    }
+  }
+
+  public static boolean isFilterAvailable() {
+    return logFeederFilterWrapper != null;
+  }
+
+  public static List<String> getAllowedLevels(String hostName, LogFeederFilter componentFilter) {
+    String componentName = componentFilter.getLabel();
+    List<String> hosts = componentFilter.getHosts();
+    List<String> defaultLevels = componentFilter.getDefaultLevels();
+    List<String> overrideLevels = componentFilter.getOverrideLevels();
+    String expiryTime = componentFilter.getExpiryTime();
+    
+    // check is user override or not
+    if (StringUtils.isNotEmpty(expiryTime) || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) {
+      if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts
+        hosts.add(LogFeederConstants.ALL);
+      }
+      
+      if (LogFeederUtil.isListContains(hosts, hostName, false)) {
+        if (isFilterExpired(componentFilter)) {
+          LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
+              componentFilter.getExpiryTime());
+          return defaultLevels;
+        } else {
+          return overrideLevels;
+        }
+      }
+    }
+    return defaultLevels;
+  }
+
+  private static boolean isFilterExpired(LogFeederFilter logfeederFilter) {
+    if (logfeederFilter == null)
+      return false;
+    
+    Date filterEndDate = parseFilterExpireDate(logfeederFilter);
+    if (filterEndDate == null) {
+      return false;
+    }
+    
+    Date currentDate = new Date();
+    if (!currentDate.before(filterEndDate)) {
+      LOG.debug("Filter for  Component :" + logfeederFilter.getLabel() + " and Hosts : [" +
+          StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
+          formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) {
+    String expiryTime = vLogfeederFilter.getExpiryTime();
+    if (StringUtils.isNotEmpty(expiryTime)) {
+      try {
+        return formatter.get().parse(expiryTime);
+      } catch (ParseException e) {
+        LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
+          + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), ',') + "]");
+      }
+    }
+    return null;
+  }
+  
+  public static LogFeederFilter findComponentFilter(String componentName) {
+    if (logFeederFilterWrapper != null) {
+      HashMap<String, LogFeederFilter> filter = logFeederFilterWrapper.getFilter();
+      if (filter != null) {
+        LogFeederFilter componentFilter = filter.get(componentName);
+        if (componentFilter != null) {
+          return componentFilter;
+        }
+      }
+    }
+    LOG.trace("Filter is not there for component :" + componentName);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
deleted file mode 100644
index 09673a0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-public class LogFeederConstants {
-
-  public static final String ALL = "all";
-  public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
-  public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
-  // solr fields
-  public static final String SOLR_LEVEL = "level";
-  public static final String SOLR_COMPONENT = "type";
-  public static final String SOLR_HOST = "host";
-
-  // UserConfig Constants History
-  public static final String VALUES = "jsons";
-  public static final String ROW_TYPE = "rowtype";
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
new file mode 100644
index 0000000..60c8ae8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LogFeederFilter {
+
+  private String label;
+  private List<String> hosts;
+  private List<String> defaultLevels;
+  private List<String> overrideLevels;
+  private String expiryTime;
+
+  public LogFeederFilter() {
+    hosts = new ArrayList<String>();
+    defaultLevels = new ArrayList<String>();
+    overrideLevels = new ArrayList<String>();
+  }
+
+  public String getLabel() {
+    return label;
+  }
+
+  public void setLabel(String label) {
+    this.label = label;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public void setHosts(List<String> hosts) {
+    this.hosts = hosts;
+  }
+
+  public List<String> getDefaultLevels() {
+    return defaultLevels;
+  }
+
+  public void setDefaultLevels(List<String> defaultLevels) {
+    this.defaultLevels = defaultLevels;
+  }
+
+  public List<String> getOverrideLevels() {
+    return overrideLevels;
+  }
+
+  public void setOverrideLevels(List<String> overrideLevels) {
+    this.overrideLevels = overrideLevels;
+  }
+
+  public String getExpiryTime() {
+    return expiryTime;
+  }
+
+  public void setExpiryTime(String expiryTime) {
+    this.expiryTime = expiryTime;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
new file mode 100644
index 0000000..9199cd3
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.HashMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LogFeederFilterWrapper {
+
+  private HashMap<String, LogFeederFilter> filter;
+  private String id;
+
+  public HashMap<String, LogFeederFilter> getFilter() {
+    return filter;
+  }
+
+  public void setFilter(HashMap<String, LogFeederFilter> filter) {
+    this.filter = filter;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
deleted file mode 100644
index bc807193..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-public enum LogfeederScheduler {
-
-  INSTANCE;
-
-  private Logger logger = Logger.getLogger(LogfeederScheduler.class);
-
-  private static boolean running = false;
-
-  public synchronized void start() {
-    boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
-    if (!filterEnable) {
-      logger.info("Logfeeder  filter Scheduler is disabled.");
-      return;
-    }
-    if (!running) {
-      for (Thread thread : getThreadList()) {
-        thread.start();
-      }
-      running = true;
-      logger.info("Logfeeder Scheduler started!");
-    } else {
-      logger.warn("Logfeeder Scheduler is already running.");
-    }
-  }
-
-  private List<Thread> getThreadList() {
-    List<Thread> tasks = new ArrayList<Thread>();
-    Thread configMonitor = new FetchConfigFromSolr(true);
-    tasks.add(configMonitor);
-    return tasks;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
deleted file mode 100644
index b5e4eb3..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.view.VLogfeederFilter;
-import org.apache.log4j.Logger;
-
-class ApplyLogFilter extends DefaultDataFilter {
-
-  private static Logger logger = Logger.getLogger(ApplyLogFilter.class);
-
-  @Override
-  public boolean applyFilter(Map<String, Object> jsonObj, boolean defaultValue) {
-    if (isEmpty(jsonObj)) {
-      logger.warn("Output jsonobj is empty");
-      return defaultValue;
-    }
-    String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
-    if (isNotEmpty(hostName)) {
-      String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
-      if (isNotEmpty(componentName)) {
-        String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
-        if (isNotEmpty(level)) {
-          VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName);
-          if (componentFilter == null) {
-            return defaultValue;
-          }
-          List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels(
-              hostName, componentFilter);
-          if (allowedLevels == null || allowedLevels.isEmpty()) {
-            allowedLevels.add(LogFeederConstants.ALL);
-          }
-          return LogFeederUtil.isListContains(allowedLevels, level, false);
-        }
-      }
-    }
-    return defaultValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
deleted file mode 100644
index 04d2ca4..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.Map;
-
-/**
- * Default filter to allow everything
- */
-class DefaultDataFilter {
-  public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) {
-    return defaultValue;
-  }
-
-  protected boolean isEmpty(Map<String, Object> map) {
-    if (map == null || map.isEmpty()) {
-      return true;
-    }
-    return false;
-  }
-
-  protected boolean isEmpty(String str) {
-    if (str == null || str.trim().isEmpty()) {
-      return true;
-    }
-    return false;
-  }
-
-  protected boolean isNotEmpty(String str) {
-    return !isEmpty(str);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
deleted file mode 100644
index 3a8eae9..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Read configuration from solr and filter the log
- */
-public enum FilterLogData {
-  INSTANCE;
-  private ApplyLogFilter applyLogFilter = new ApplyLogFilter();
-  private static Logger logger = Logger.getLogger(FilterLogData.class);
-  // by default allow every log
-  boolean defaultValue = true;
-
-  public boolean isAllowed(String jsonBlock) {
-    if (jsonBlock == null || jsonBlock.isEmpty()) {
-      return defaultValue;
-    }
-    Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
-    return isAllowed(jsonObj);
-  }
-
-  public boolean isAllowed(Map<String, Object> jsonObj) {
-    boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue);
-    if (!isAllowed) {
-      logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
-    }
-    return isAllowed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 906dd25..96709c0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -26,22 +26,18 @@ public abstract class Mapper {
   protected String fieldName;
   private String mapClassCode;
 
-  public boolean init(String inputDesc, String fieldName,
-                      String mapClassCode, Object mapConfigs) {
+  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs);
+
+  protected void init(String inputDesc, String fieldName, String mapClassCode) {
     this.inputDesc = inputDesc;
     this.fieldName = fieldName;
     this.mapClassCode = mapClassCode;
-    return true;
   }
 
-  public Object apply(Map<String, Object> jsonObj, Object value) {
-    return value;
-  }
+  public abstract Object apply(Map<String, Object> jsonObj, Object value);
 
   @Override
   public String toString() {
-    return "mapClass=" + mapClassCode + ", input=" + inputDesc
-      + ", fieldName=" + fieldName;
+    return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 6dbf8be..eb3ae01 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -31,31 +31,29 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 public class MapperDate extends Mapper {
-  private static final Logger logger = Logger.getLogger(MapperDate.class);
+  private static final Logger LOG = Logger.getLogger(MapperDate.class);
 
   private SimpleDateFormat targetDateFormatter = null;
   private boolean isEpoch = false;
   private SimpleDateFormat srcDateFormatter=null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName,
-                      String mapClassCode, Object mapConfigs) {
-    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+    init(inputDesc, fieldName, mapClassCode);
     if (!(mapConfigs instanceof Map)) {
-      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
-        + mapConfigs.getClass().getName()
-        + ", map="
-        + this.toString());
+      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() +
+        ", map=" + this);
       return false;
     }
+    
     @SuppressWarnings("unchecked")
     Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
     String targetDateFormat = (String) mapObjects.get("target_date_pattern");
     String srcDateFormat = (String) mapObjects.get("src_date_pattern");
     if (StringUtils.isEmpty(targetDateFormat)) {
-      logger.fatal("Date format for map is empty. " + this.toString());
+      LOG.fatal("Date format for map is empty. " + this);
     } else {
-      logger.info("Date mapper format is " + targetDateFormat);
+      LOG.info("Date mapper format is " + targetDateFormat);
 
       if (targetDateFormat.equalsIgnoreCase("epoch")) {
         isEpoch = true;
@@ -68,8 +66,7 @@ public class MapperDate extends Mapper {
           }
           return true;
         } catch (Throwable ex) {
-          logger.fatal("Error creating date format. format="
-            + targetDateFormat + ". " + this.toString());
+          LOG.fatal("Error creating date format. format=" + targetDateFormat + ". " + this.toString());
         }
       } 
     }
@@ -84,7 +81,7 @@ public class MapperDate extends Mapper {
           long ms = Long.parseLong(value.toString()) * 1000;
           value = new Date(ms);
         } else if (targetDateFormatter != null) {
-          if(srcDateFormatter!=null){
+          if (srcDateFormatter != null) {
             Date srcDate = srcDateFormatter.parse(value.toString());
             //set year in src_date when src_date does not have year component
             if (!srcDateFormatter.toPattern().contains("yy")) {
@@ -108,12 +105,9 @@ public class MapperDate extends Mapper {
         }
         jsonObj.put(fieldName, value);
       } catch (Throwable t) {
-        LogFeederUtil.logErrorMessageByInterval(this.getClass()
-            .getSimpleName() + ":apply",
-          "Error applying date transformation. isEpoch="
-            + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
-            + ", value=" + value + ". " + this.toString(),
-          t, logger, Level.ERROR);
+        LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying date transformation." +
+            " isEpoch=" + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
+            + ", value=" + value + ". " + this.toString(), t, LOG, Level.ERROR);
       }
     }
     return value;

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index c692a9d..9b6e83c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -30,24 +30,23 @@ import org.apache.log4j.Logger;
  * Overrides the value for the field
  */
 public class MapperFieldName extends Mapper {
-  private static final Logger logger = Logger.getLogger(MapperFieldName.class);
+  private static final Logger LOG = Logger.getLogger(MapperFieldName.class);
 
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName,
-      String mapClassCode, Object mapConfigs) {
-    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+    init(inputDesc, fieldName, mapClassCode);
     if (!(mapConfigs instanceof Map)) {
-      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
-          + mapConfigs.getClass().getName());
+      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
       return false;
     }
+    
     @SuppressWarnings("unchecked")
     Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
     newValue = (String) mapObjects.get("new_fieldname");
     if (StringUtils.isEmpty(newValue)) {
-      logger.fatal("Map field value is empty.");
+      LOG.fatal("Map field value is empty.");
       return false;
     }
     return true;
@@ -59,12 +58,9 @@ public class MapperFieldName extends Mapper {
       jsonObj.remove(fieldName);
       jsonObj.put(newValue, value);
     } else {
-      LogFeederUtil.logErrorMessageByInterval(this.getClass()
-          .getSimpleName() + ":apply",
-          "New fieldName is null, so transformation is not applied. "
-              + this.toString(), null, logger, Level.ERROR);
+      LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
+          "New fieldName is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR);
     }
     return value;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index e618261..87cda65 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -30,25 +30,25 @@ import org.apache.log4j.Logger;
  * Overrides the value for the field
  */
 public class MapperFieldValue extends Mapper {
-  private Logger logger = Logger.getLogger(MapperFieldValue.class);
+  private static final Logger LOG = Logger.getLogger(MapperFieldValue.class);
+  
   private String prevValue = null;
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName,
-      String mapClassCode, Object mapConfigs) {
-    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+    init(inputDesc, fieldName, mapClassCode);
     if (!(mapConfigs instanceof Map)) {
-      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
-          + mapConfigs.getClass().getName());
+      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
       return false;
     }
+    
     @SuppressWarnings("unchecked")
     Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
     prevValue = (String) mapObjects.get("pre_value");
     newValue = (String) mapObjects.get("post_value");
     if (StringUtils.isEmpty(newValue)) {
-      logger.fatal("Map field value is empty.");
+      LOG.fatal("Map field value is empty.");
       return false;
     }
     return true;
@@ -56,20 +56,15 @@ public class MapperFieldValue extends Mapper {
 
   @Override
   public Object apply(Map<String, Object> jsonObj, Object value) {
-    if (newValue != null) {
-      if (prevValue != null) {
-        if (prevValue.equalsIgnoreCase(value.toString())) {
-          value = newValue;
-          jsonObj.put(fieldName, value);
-        }
+    if (newValue != null && prevValue != null) {
+      if (prevValue.equalsIgnoreCase(value.toString())) {
+        value = newValue;
+        jsonObj.put(fieldName, value);
       }
     } else {
-      LogFeederUtil.logErrorMessageByInterval(
-          this.getClass().getSimpleName() + ":apply",
-          "New value is null, so transformation is not applied. "
-              + this.toString(), null, logger, Level.ERROR);
+      LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
+          "New value is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR);
     }
     return value;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 0a0f4e9..32dfef2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -20,26 +20,26 @@
 package org.apache.ambari.logfeeder.metrics;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.log4j.Logger;
 
 // TODO: Refactor for failover
 public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
-  private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+  private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class);
 
   private String collectorHosts = null;
 
   public LogFeederAMSClient() {
-    collectorHosts = LogFeederUtil
-      .getStringProperty("logfeeder.metrics.collector.hosts");
-    if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+    collectorHosts = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.hosts");
+    if (StringUtils.isBlank(collectorHosts)) {
       collectorHosts = null;
     }
     if (collectorHosts != null) {
       collectorHosts = collectorHosts.trim();
     }
-    logger.info("AMS collector URL=" + collectorHosts);
+    LOG.info("AMS collector URL=" + collectorHosts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
deleted file mode 100644
index abb84c7..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.metrics;
-
-public class MetricCount {
-  public String metricsName = null;
-  public boolean isPointInTime = false;
-
-  public long count = 0;
-  public long prevLogCount = 0;
-  public long prevLogMS = System.currentTimeMillis();
-  public long prevPublishCount = 0;
-  public int publishCount = 0; // Count of published metrics. Used for first time sending metrics
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
new file mode 100644
index 0000000..e7f5d37
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class MetricData {
+  public final String metricsName;
+  public final boolean isPointInTime;
+
+  public MetricData(String metricsName, boolean isPointInTime) {
+    this.metricsName = metricsName;
+    this.isPointInTime = isPointInTime;
+  }
+  
+  public long value = 0;
+  public long prevPublishValue = 0;
+  
+  public long prevLogValue = 0;
+  public long prevLogTime = System.currentTimeMillis();
+  
+  public int publishCount = 0; // Number of times the metric was published so far
+  
+  @Override
+  public String toString() {
+    return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
new file mode 100644
index 0000000..942c0b4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsManager {
+  private static final Logger LOG = Logger.getLogger(MetricsManager.class);
+
+  private boolean isMetricsEnabled = false;
+  private String nodeHostName = null;
+  private String appId = "logfeeder";
+
+  private long lastPublishTimeMS = 0; // Let's do the first publish immediately
+  private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
+
+  private int publishIntervalMS = 60 * 1000;
+  private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep the metrics in memory forever
+  private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+  private LogFeederAMSClient amsClient = null;
+
+  public void init() {
+    LOG.info("Initializing MetricsManager()");
+    amsClient = new LogFeederAMSClient();
+
+    if (amsClient.getCollectorUri(null) != null) {
+      findNodeHostName();
+      if (nodeHostName == null) {
+        isMetricsEnabled = false;
+        LOG.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+      } else {
+        isMetricsEnabled = true;
+        LOG.info("LogFeeder Metrics is enabled. Metrics host=" + amsClient.getCollectorUri(null));
+      }
+    } else {
+      LOG.info("LogFeeder Metrics publish is disabled");
+    }
+  }
+
+  private void findNodeHostName() {
+    nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+    if (nodeHostName == null) {
+      try {
+        nodeHostName = InetAddress.getLocalHost().getHostName();
+      } catch (Throwable e) {
+        LOG.warn("Error getting hostname using InetAddress.getLocalHost().getHostName()", e);
+      }
+    }
+    if (nodeHostName == null) {
+      try {
+        nodeHostName = InetAddress.getLocalHost().getCanonicalHostName();
+      } catch (Throwable e) {
+        LOG.warn("Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", e);
+      }
+    }
+  }
+
+  public boolean isMetricsEnabled() {
+    return isMetricsEnabled;
+  }
+
+  public synchronized void useMetrics(List<MetricData> metricsList) {
+    if (!isMetricsEnabled) {
+      return;
+    }
+    LOG.info("useMetrics() metrics.size=" + metricsList.size());
+    long currMS = System.currentTimeMillis();
+    
+    gatherMetrics(metricsList, currMS);
+    publishMetrics(currMS);
+  }
+
+  private void gatherMetrics(List<MetricData> metricsList, long currMS) {
+    Long currMSLong = new Long(currMS);
+    for (MetricData metric : metricsList) {
+      if (metric.metricsName == null) {
+        LOG.debug("metric.metricsName is null");
+        continue;
+      }
+      long currCount = metric.value;
+      if (!metric.isPointInTime && metric.publishCount > 0 && currCount <= metric.prevPublishValue) {
+        LOG.debug("Nothing changed. " + metric.metricsName + ", currCount=" + currCount + ", prevPublishCount=" +
+            metric.prevPublishValue);
+        continue;
+      }
+      metric.publishCount++;
+
+      LOG.debug("Ensuring metrics=" + metric.metricsName);
+      TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+      if (timelineMetric == null) {
+        LOG.debug("Creating new metric obbject for " + metric.metricsName);
+        timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metric.metricsName);
+        timelineMetric.setHostName(nodeHostName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setStartTime(currMS);
+        timelineMetric.setType("Long");
+        timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+        metricsMap.put(metric.metricsName, timelineMetric);
+      }
+      
+      LOG.debug("Adding metrics=" + metric.metricsName);
+      if (metric.isPointInTime) {
+        timelineMetric.getMetricValues().put(currMSLong, new Double(currCount));
+      } else {
+        Double value = timelineMetric.getMetricValues().get(currMSLong);
+        if (value == null) {
+          value = new Double(0);
+        }
+        value += (currCount - metric.prevPublishValue);
+        timelineMetric.getMetricValues().put(currMSLong, value);
+        metric.prevPublishValue = currCount;
+      }
+    }
+  }
+
+  private void publishMetrics(long currMS) {
+    if (!metricsMap.isEmpty() && currMS - lastPublishTimeMS > publishIntervalMS) {
+      try {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(new ArrayList<TimelineMetric>(metricsMap.values()));
+        amsClient.emitMetrics(timelineMetrics);
+        
+        LOG.info("Published " + timelineMetrics.getMetrics().size() + " metrics to AMS");
+        metricsMap.clear();
+        lastPublishTimeMS = currMS;
+      } catch (Throwable t) {
+        LOG.warn("Error sending metrics to AMS.", t);
+        if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+          LOG.error("AMS was not sent for last " + maxMetricsBuffer / 1000 +
+              " seconds. Purging it and will start rebuilding it again");
+          metricsMap.clear();
+          lastFailedPublishTimeMS = currMS;
+        }
+      }
+    } else {
+      LOG.info("Not publishing metrics. metrics.size()=" + metricsMap.size() + ", lastPublished=" +
+          (currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
deleted file mode 100644
index 33397c7..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.metrics;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.log4j.Logger;
-
-public class MetricsMgr {
-  private static final Logger logger = Logger.getLogger(MetricsMgr.class);
-
-  private boolean isMetricsEnabled = false;
-  private String nodeHostName = null;
-  private String appId = "logfeeder";
-
-  private long lastPublishTimeMS = 0; // Let's do the first publish immediately
-  private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
-
-  private int publishIntervalMS = 60 * 1000;
-  private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
-  // the metrics in memory forever
-  private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
-  private LogFeederAMSClient amsClient = null;
-
-  public void init() {
-    logger.info("Initializing MetricsMgr()");
-    amsClient = new LogFeederAMSClient();
-
-    if (amsClient.getCollectorUri(null) != null) {
-      nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
-      if (nodeHostName == null) {
-        try {
-          nodeHostName = InetAddress.getLocalHost().getHostName();
-        } catch (Throwable e) {
-          logger.warn(
-            "Error getting hostname using InetAddress.getLocalHost().getHostName()",
-            e);
-        }
-        if (nodeHostName == null) {
-          try {
-            nodeHostName = InetAddress.getLocalHost()
-              .getCanonicalHostName();
-          } catch (Throwable e) {
-            logger.warn(
-              "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
-              e);
-          }
-        }
-      }
-      if (nodeHostName == null) {
-        isMetricsEnabled = false;
-        logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
-      } else {
-        isMetricsEnabled = true;
-        logger.info("LogFeeder Metrics is enabled. Metrics host="
-          + amsClient.getCollectorUri(null));
-      }
-    } else {
-      logger.info("LogFeeder Metrics publish is disabled");
-    }
-  }
-
-  public boolean isMetricsEnabled() {
-    return isMetricsEnabled;
-  }
-
-  synchronized public void useMetrics(List<MetricCount> metricsList) {
-    if (!isMetricsEnabled) {
-      return;
-    }
-    logger.info("useMetrics() metrics.size=" + metricsList.size());
-    long currMS = System.currentTimeMillis();
-    Long currMSLong = new Long(currMS);
-    for (MetricCount metric : metricsList) {
-      if (metric.metricsName == null) {
-        logger.debug("metric.metricsName is null");
-        // Metrics is not meant to be published
-        continue;
-      }
-      long currCount = metric.count;
-      if (!metric.isPointInTime && metric.publishCount > 0
-        && currCount <= metric.prevPublishCount) {
-        // No new data added, so let's ignore it
-        logger.debug("Nothing changed. " + metric.metricsName
-          + ", currCount=" + currCount + ", prevPublishCount="
-          + metric.prevPublishCount);
-        continue;
-      }
-      metric.publishCount++;
-
-      TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
-      if (timelineMetric == null) {
-        logger.debug("Creating new metric obbject for "
-          + metric.metricsName);
-        // First time for this metric
-        timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(metric.metricsName);
-        timelineMetric.setHostName(nodeHostName);
-        timelineMetric.setAppId(appId);
-        timelineMetric.setStartTime(currMS);
-        timelineMetric.setType("Long");
-        timelineMetric.setMetricValues(new TreeMap<Long, Double>());
-
-        metricsMap.put(metric.metricsName, timelineMetric);
-      }
-      logger.debug("Adding metrics=" + metric.metricsName);
-      if (metric.isPointInTime) {
-        timelineMetric.getMetricValues().put(currMSLong,
-          new Double(currCount));
-      } else {
-        Double value = timelineMetric.getMetricValues().get(currMSLong);
-        if (value == null) {
-          value = new Double(0);
-        }
-        value += (currCount - metric.prevPublishCount);
-        timelineMetric.getMetricValues().put(currMSLong, value);
-        metric.prevPublishCount = currCount;
-      }
-    }
-
-    if (metricsMap.size() > 0
-      && currMS - lastPublishTimeMS > publishIntervalMS) {
-      try {
-        // Time to publish
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
-        timeLineMetricList.addAll(metricsMap.values());
-        timelineMetrics.setMetrics(timeLineMetricList);
-        amsClient.emitMetrics(timelineMetrics);
-        logger.info("Published " + timeLineMetricList.size()
-          + " metrics to AMS");
-        metricsMap.clear();
-        timeLineMetricList.clear();
-        lastPublishTimeMS = currMS;
-      } catch (Throwable t) {
-        logger.warn("Error sending metrics to AMS.", t);
-        if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
-          logger.error("AMS was not sent for last "
-            + maxMetricsBuffer
-            / 1000
-            + " seconds. Purging it and will start rebuilding it again");
-          metricsMap.clear();
-          lastFailedPublishTimeMS = currMS;
-        }
-      }
-    } else {
-      logger.info("Not publishing metrics. metrics.size()="
-        + metricsMap.size() + ", lastPublished="
-        + (currMS - lastPublishTimeMS) / 1000
-        + " seconds ago, intervalConfigured=" + publishIntervalMS
-        / 1000);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index 6f84251..bc6a553 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -26,16 +26,19 @@ import java.util.Map.Entry;
 
 import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public abstract class Output extends ConfigBlock {
-  private static final Logger logger = Logger.getLogger(Output.class);
+  private static final Logger LOG = Logger.getLogger(Output.class);
 
   private String destination = null;
 
-  protected MetricCount writeBytesMetric = new MetricCount();
+  protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
+  protected String getWriteBytesMetricName() {
+    return null;
+  }
 
   @Override
   public String getShortDescription() {
@@ -67,7 +70,7 @@ public abstract class Output extends ConfigBlock {
    * Extend this method to clean up
    */
   public void close() {
-    logger.info("Calling base close()." + getShortDescription());
+    LOG.info("Calling base close()." + getShortDescription());
     isClosed = true;
   }
 
@@ -91,7 +94,7 @@ public abstract class Output extends ConfigBlock {
   }
 
   @Override
-  public void addMetricsContainers(List<MetricCount> metricsList) {
+  public void addMetricsContainers(List<MetricData> metricsList) {
     super.addMetricsContainers(metricsList);
     metricsList.add(writeBytesMetric);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
index 4a408f9..c46086e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -27,18 +27,16 @@ import org.apache.ambari.logfeeder.input.InputMarker;
  * This contains the output json object and InputMarker.
  */
 public class OutputData {
-  Map<String, Object> jsonObj;
-  InputMarker inputMarker;
+  public final Map<String, Object> jsonObj;
+  public final InputMarker inputMarker;
 
   public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    super();
     this.jsonObj = jsonObj;
     this.inputMarker = inputMarker;
   }
 
   @Override
   public String toString() {
-    return "OutputData [jsonObj=" + jsonObj + ", inputMarker="
-      + inputMarker + "]";
+    return "OutputData [jsonObj=" + jsonObj + ", inputMarker=" + inputMarker + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index 2d41a0b..fa4e17b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -28,16 +28,15 @@ import org.apache.log4j.Logger;
  */
 public class OutputDevNull extends Output {
 
-  private static Logger logger = Logger.getLogger(OutputDevNull.class);
+  private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
 
   @Override
   public void write(String block, InputMarker inputMarker){
-    logger.trace("Ignore log block: " + block);
+    LOG.trace("Ignore log block: " + block);
   }
 
   @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
-    throw new UnsupportedOperationException(
-        "copyFile method is not yet supported for output=dev_null");
+    throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
   }
 }