You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/11 18:15:02 UTC

[48/51] [partial] ambari git commit: AMBARI-15679. Initial commit for LogSearch module (oleewre)

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
new file mode 100644
index 0000000..9b2a717
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.io.BufferedInputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.gson.reflect.TypeToken;
+
+public class FilterGrok extends Filter {
+  static private Logger logger = Logger.getLogger(FilterGrok.class);
+
+  private static final String GROK_PATTERN_FILE = "grok-patterns";
+
+  String messagePattern = null;
+  String multilinePattern = null;
+
+  Grok grokMultiline = null;
+  Grok grokMessage = null;
+
+  StringBuilder strBuff = null;
+  String currMultilineJsonStr = null;
+
+  InputMarker firstInputMarker = null;
+  InputMarker savedInputMarker = null;
+
+  String sourceField = null;
+  boolean removeSourceField = true;
+
+  Set<String> namedParamList = new HashSet<String>();
+  Set<String> multiLineamedParamList = new HashSet<String>();
+
+  Type jsonType = new TypeToken<Map<String, String>>() {
+  }.getType();
+
+  public MetricCount grokErrorMetric = new MetricCount();
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+
+    try {
+      grokErrorMetric.metricsName = "filter.error.grok";
+      // Get the Grok file patterns
+      messagePattern = escapePattern(getStringValue("message_pattern"));
+      multilinePattern = escapePattern(getStringValue("multiline_pattern"));
+      sourceField = getStringValue("source_field");
+      removeSourceField = getBooleanValue("remove_source_field",
+        removeSourceField);
+
+      logger.info("init() done. grokPattern=" + messagePattern
+        + ", multilinePattern=" + multilinePattern + ", "
+        + getShortDescription());
+      if (StringUtils.isEmpty(messagePattern)) {
+        logger.error("message_pattern is not set for filter.");
+        return;
+      }
+      extractNamedParams(messagePattern, namedParamList);
+
+      grokMessage = new Grok();
+      // grokMessage.addPatternFromReader(r);
+      loadPatterns(grokMessage);
+      grokMessage.compile(messagePattern);
+      if (!StringUtils.isEmpty(multilinePattern)) {
+        extractNamedParams(multilinePattern, multiLineamedParamList);
+
+        grokMultiline = new Grok();
+        loadPatterns(grokMultiline);
+        grokMultiline.compile(multilinePattern);
+      }
+    } catch (Throwable t) {
+      logger.fatal(
+        "Caught exception while initializing Grok. multilinePattern="
+          + multilinePattern + ", messagePattern="
+          + messagePattern, t);
+      grokMessage = null;
+      grokMultiline = null;
+    }
+
+  }
+
+  /**
+   * @param stringValue
+   * @return
+   */
+  private String escapePattern(String inPattern) {
+    String inStr = inPattern;
+    if (inStr != null) {
+      if (inStr.contains("(?m)") && !inStr.contains("(?s)")) {
+        inStr = inStr.replaceFirst("(?m)", "(?s)");
+      }
+      // inStr = inStr.replaceAll("\\[", "\\\\[");
+      // inStr = inStr.replaceAll("\\]", "\\\\]");
+      // inStr = inStr.replaceAll("\\(", "\\\\(");
+      // inStr = inStr.replaceAll("\\)", "\\\\)");
+    }
+    return inStr;
+  }
+
+  private void extractNamedParams(String patternStr, Set<String> paramList) {
+    String grokRegEx = "%\\{" + "(?<name>" + "(?<pattern>[A-z0-9]+)"
+      + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + "(?:=(?<definition>"
+      + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}";
+
+    Pattern pattern = Pattern.compile(grokRegEx);
+    java.util.regex.Matcher matcher = pattern.matcher(patternStr);
+    while (matcher.find()) {
+      String subname = matcher.group(3);
+      if (subname != null) {
+        paramList.add(subname);
+      }
+    }
+  }
+
+  private boolean loadPatterns(Grok grok) {
+    InputStreamReader grokPatternsReader = null;
+    logger.info("Loading pattern file " + GROK_PATTERN_FILE);
+    try {
+      BufferedInputStream fileInputStream = (BufferedInputStream) this
+        .getClass().getClassLoader()
+        .getResourceAsStream(GROK_PATTERN_FILE);
+      if (fileInputStream == null) {
+        logger.fatal("Couldn't load grok-patterns file "
+          + GROK_PATTERN_FILE + ". Things will not work");
+        return false;
+      }
+      grokPatternsReader = new InputStreamReader(fileInputStream);
+    } catch (Throwable t) {
+      logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE
+        + " from classpath. Grok filtering will not work.", t);
+      return false;
+    }
+    try {
+      grok.addPatternFromReader(grokPatternsReader);
+    } catch (GrokException e) {
+      logger.fatal(
+        "Error loading patterns from grok-patterns reader for file "
+          + GROK_PATTERN_FILE, e);
+      return false;
+    }
+
+    return true;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
+   */
+  @Override
+  public void apply(String inputStr, InputMarker inputMarker) {
+    if (grokMessage == null) {
+      return;
+    }
+
+    if (grokMultiline != null) {
+      // Check if new line
+      String jsonStr = grokMultiline.capture(inputStr);
+      if (!"{}".equals(jsonStr)) {
+        // New line
+        if (strBuff != null) {
+          savedInputMarker.beginLineNumber = firstInputMarker.lineNumber;
+          // Construct JSON object and add only the interested named
+          // parameters
+          Map<String, Object> jsonObj = Collections
+            .synchronizedMap(new HashMap<String, Object>());
+          try {
+            // Handle message parsing
+            applyMessage(strBuff.toString(), jsonObj,
+              currMultilineJsonStr);
+          } finally {
+            strBuff = null;
+            savedInputMarker = null;
+            firstInputMarker = null;
+          }
+        }
+        currMultilineJsonStr = jsonStr;
+      }
+
+      if (strBuff == null) {
+        strBuff = new StringBuilder();
+        firstInputMarker = inputMarker;
+      } else {
+        // strBuff.append(System.lineSeparator());
+        strBuff.append('\r');
+        strBuff.append('\n');
+      }
+      strBuff.append(inputStr);
+      savedInputMarker = inputMarker;
+    } else {
+      savedInputMarker = inputMarker;
+      Map<String, Object> jsonObj = Collections
+        .synchronizedMap(new HashMap<String, Object>());
+      applyMessage(inputStr, jsonObj, null);
+    }
+  }
+
+  @Override
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    if (sourceField != null) {
+      savedInputMarker = inputMarker;
+      applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
+      if (removeSourceField) {
+        jsonObj.remove(sourceField);
+      }
+    }
+  }
+
+  /**
+   * @param inputStr
+   * @param jsonObj
+   */
+  private void applyMessage(String inputStr, Map<String, Object> jsonObj,
+                            String multilineJsonStr) {
+    String jsonStr = grokParse(inputStr);
+
+    boolean parseError = false;
+    if ("{}".equals(jsonStr)) {
+      parseError = true;
+      // Error parsing string.
+      logParseError(inputStr);
+
+      if (multilineJsonStr == null) {
+        // TODO: Should we just add this as raw message in solr?
+        return;
+      }
+    }
+
+    if (parseError) {
+      jsonStr = multilineJsonStr;
+    }
+    Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr,
+      jsonType);
+    for (String namedParam : namedParamList) {
+      if (jsonSrc.get(namedParam) != null) {
+        jsonObj.put(namedParam, jsonSrc.get(namedParam));
+      }
+    }
+    if (parseError) {
+      // Add error tags
+      @SuppressWarnings("unchecked")
+      List<String> tagsList = (List<String>) jsonObj.get("tags");
+      if (tagsList == null) {
+        tagsList = new ArrayList<String>();
+        jsonObj.put("tags", tagsList);
+      }
+      tagsList.add("error_grok_parsing");
+      if (sourceField == null) {
+        // For now let's put the raw message in log_message, so it is
+        // will be searchable
+        jsonObj.put("log_message", inputStr);
+      }
+    }
+
+    super.apply(jsonObj, savedInputMarker);
+    statMetric.count++;
+  }
+
+  public String grokParse(String inputStr) {
+    String jsonStr = grokMessage.capture(inputStr);
+    return jsonStr;
+  }
+
+  private void logParseError(String inputStr) {
+    grokErrorMetric.count++;
+    final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+      + "_PARSEERROR";
+    int inputStrLength = inputStr != null ? inputStr.length() : 0;
+    LogFeederUtil.logErrorMessageByInterval(
+      LOG_MESSAGE_KEY,
+      "Error parsing string. length=" + inputStrLength
+        + ", input=" + input.getShortDescription()
+        + ". First upto 100 characters="
+        + LogFeederUtil.subString(inputStr, 100), null, logger,
+      Level.WARN);
+  }
+
+  @Override
+  public void flush() {
+    if (strBuff != null) {
+      // Handle message parsing
+      Map<String, Object> jsonObj = Collections
+        .synchronizedMap(new HashMap<String, Object>());
+      applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
+      strBuff = null;
+      savedInputMarker = null;
+    }
+    super.flush();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "filter:filter=grok,regex=" + messagePattern;
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    super.addMetricsContainers(metricsList);
+    metricsList.add(grokErrorMetric);
+  }
+
+  @Override
+  public void logStat() {
+    super.logStat();
+    // Printing stat for grokErrors
+    logStatForMetric(grokErrorMetric, "Stat: Grok Errors");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
new file mode 100644
index 0000000..c4da3cb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FilterKeyValue extends Filter {
+  static private Logger logger = Logger.getLogger(FilterKeyValue.class);
+
+  String sourceField = null;
+  String valueSplit = "=";
+  String fieldSplit = "\t";
+
+  public MetricCount errorMetric = new MetricCount();
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+    errorMetric.metricsName = "filter.error.keyvalue";
+
+    sourceField = getStringValue("source_field");
+    valueSplit = getStringValue("value_split", valueSplit);
+    fieldSplit = getStringValue("field_split", fieldSplit);
+
+    logger.info("init() done. source_field=" + sourceField
+      + ", value_split=" + valueSplit + ", " + ", field_split="
+      + fieldSplit + ", " + getShortDescription());
+    if (StringUtils.isEmpty(sourceField)) {
+      logger.fatal("source_field is not set for filter. This filter will not be applied");
+      return;
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
+   */
+  @Override
+  public void apply(String inputStr, InputMarker inputMarker) {
+    apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
+  }
+
+  @Override
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    if (sourceField == null) {
+      return;
+    }
+    Object valueObj = jsonObj.get(sourceField);
+    if (valueObj != null) {
+      StringTokenizer fieldTokenizer = new StringTokenizer(
+        valueObj.toString(), fieldSplit);
+      while (fieldTokenizer.hasMoreTokens()) {
+        String nv = fieldTokenizer.nextToken();
+        StringTokenizer nvTokenizer = new StringTokenizer(nv,
+          valueSplit);
+        while (nvTokenizer.hasMoreTokens()) {
+          String name = nvTokenizer.nextToken();
+          if (nvTokenizer.hasMoreTokens()) {
+            String value = nvTokenizer.nextToken();
+            jsonObj.put(name, value);
+          } else {
+            // Unbalanced name value pairs
+            logParseError("name=" + name + ", pair=" + nv
+              + ", field=" + sourceField + ", field_value="
+              + valueObj);
+          }
+        }
+      }
+    }
+    super.apply(jsonObj, inputMarker);
+    statMetric.count++;
+  }
+
+  private void logParseError(String inputStr) {
+    errorMetric.count++;
+    final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+      + "_PARSEERROR";
+    LogFeederUtil
+      .logErrorMessageByInterval(
+        LOG_MESSAGE_KEY,
+        "Error parsing string. length=" + inputStr.length()
+          + ", input=" + input.getShortDescription()
+          + ". First upto 100 characters="
+          + LogFeederUtil.subString(inputStr, 100), null, logger,
+        Level.ERROR);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "filter:filter=keyvalue,regex=" + sourceField;
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    super.addMetricsContainers(metricsList);
+    metricsList.add(errorMetric);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
new file mode 100644
index 0000000..5c4d30e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+
+
+public class JSONFilterCode extends Filter {
+  private static Logger logger = Logger.getLogger(JSONFilterCode.class);
+
+  @Override
+  public void apply(String inputStr, InputMarker inputMarker) {
+    Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(inputStr);
+    // linenumber
+    Double lineNumberD = (Double) jsonMap.get("line_number");
+    if (lineNumberD != null) {
+      long lineNumber = lineNumberD.longValue();
+      jsonMap.put("line_number", lineNumber);
+    }
+    // logtime
+    String timeStampStr = (String) jsonMap.get("logtime");
+    if (timeStampStr != null && !timeStampStr.isEmpty()) {
+      String logtime = LogFeederUtil.getDate(timeStampStr);
+      jsonMap.put("logtime", logtime);
+    }
+    super.apply(jsonMap, inputMarker);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
new file mode 100644
index 0000000..ec75f2d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.ConfigBlock;
+import org.apache.ambari.logfeeder.InputMgr;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Logger;
+
+public abstract class Input extends ConfigBlock implements Runnable {
+  static private Logger logger = Logger.getLogger(Input.class);
+
+  OutputMgr outputMgr;
+  InputMgr inputMgr;
+
+  List<Output> outputList = new ArrayList<Output>();
+
+  Filter firstFilter = null;
+  Thread thread;
+  private boolean isClosed = false;
+  String filePath = null;
+  String type = null;
+
+  boolean tail = true;
+  boolean useEventMD5 = false;
+  boolean genEventMD5 = true;
+
+  public MetricCount readBytesMetric = new MetricCount();
+
+  /**
+   * This method will be called from the thread spawned for the output. This
+   * method should only exit after all data are read from the source or the
+   * process is exiting
+   *
+   * @throws Exception
+   */
+  abstract void start() throws Exception;
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+    tail = getBooleanValue("tail", tail);
+    useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5);
+    genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5);
+
+    if (firstFilter != null) {
+      firstFilter.init();
+    }
+  }
+
+  @Override
+  public String getNameForThread() {
+    if (filePath != null) {
+      try {
+        return (type + "=" + (new File(filePath)).getName());
+      } catch (Throwable ex) {
+        logger.warn("Couldn't get basename for filePath=" + filePath,
+          ex);
+      }
+    }
+    return super.getNameForThread() + ":" + type;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see java.lang.Runnable#run()
+   */
+  @Override
+  public void run() {
+    try {
+      logger.info("Started to monitor. " + getShortDescription());
+      start();
+    } catch (Exception e) {
+      logger.error("Error writing to output.", e);
+    }
+    logger.info("Exiting thread. " + getShortDescription());
+  }
+
+  public void outputLine(String line, InputMarker marker) {
+    statMetric.count++;
+    readBytesMetric.count += (line.length());
+
+    if (firstFilter != null) {
+      firstFilter.apply(line, marker);
+    } else {
+      // TODO: For now, let's make filter mandatory, so that no one
+      // accidently forgets to write filter
+      // outputMgr.write(line, this);
+    }
+  }
+
+  /**
+   *
+   */
+  public void flush() {
+    if (firstFilter != null) {
+      firstFilter.flush();
+    }
+  }
+
+  public boolean monitor() {
+    if (isReady()) {
+      logger.info("Starting thread. " + getShortDescription());
+      thread = new Thread(this, getNameForThread());
+      thread.start();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void checkIn(InputMarker inputMarker) {
+    // Default implementation is to ignore.
+  }
+
+  /**
+   * This is generally used by final checkin
+   */
+  public void checkIn() {
+
+  }
+
+  /**
+   * @return
+   */
+  public boolean isReady() {
+    return true;
+  }
+
+  public boolean isTail() {
+    return tail;
+  }
+
+  public void setTail(boolean tail) {
+    this.tail = tail;
+  }
+
+  public boolean isUseEventMD5() {
+    return useEventMD5;
+  }
+
+  public void setUseEventMD5(boolean useEventMD5) {
+    this.useEventMD5 = useEventMD5;
+  }
+
+  public boolean isGenEventMD5() {
+    return genEventMD5;
+  }
+
+  public void setGenEventMD5(boolean genEventMD5) {
+    this.genEventMD5 = genEventMD5;
+  }
+
+  @Override
+  public void setDrain(boolean drain) {
+    logger.info("Request to drain. " + getShortDescription());
+    super.setDrain(drain);
+    ;
+    try {
+      thread.interrupt();
+    } catch (Throwable t) {
+      // ignore
+    }
+  }
+
+  public Filter getFirstFilter() {
+    return firstFilter;
+  }
+
+  public void setFirstFilter(Filter filter) {
+    firstFilter = filter;
+  }
+
+  public void setInputMgr(InputMgr inputMgr) {
+    this.inputMgr = inputMgr;
+  }
+
+  public void setOutputMgr(OutputMgr outputMgr) {
+    this.outputMgr = outputMgr;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public void close() {
+    logger.info("Close called. " + getShortDescription());
+
+    try {
+      if (firstFilter != null) {
+        firstFilter.close();
+      } else {
+        outputMgr.close();
+      }
+    } catch (Throwable t) {
+      // Ignore
+    }
+    isClosed = true;
+  }
+
+  public void setClosed(boolean isClosed) {
+    this.isClosed = isClosed;
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @Override
+  public void loadConfig(Map<String, Object> map) {
+    super.loadConfig(map);
+    String typeValue = getStringValue("type");
+    if (typeValue != null) {
+      // Explicitly add type and value to field list
+      contextFields.put("type", typeValue);
+      @SuppressWarnings("unchecked")
+      Map<String, Object> addFields = (Map<String, Object>) map
+        .get("add_fields");
+      if (addFields == null) {
+        addFields = new HashMap<String, Object>();
+        map.put("add_fields", addFields);
+      }
+      addFields.put("type", typeValue);
+    }
+  }
+
+  @Override
+  public String getShortDescription() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void logStat() {
+    super.logStat();
+    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+    if (firstFilter != null) {
+      firstFilter.logStat();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return getShortDescription();
+  }
+
+  /**
+   *
+   */
+  public void rollOver() {
+    // Only some inputs support it. E.g. InputFile
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public Date getEventTime() {
+    return null;
+  }
+
+  public List<Output> getOutputList() {
+    return outputList;
+  }
+
+  /**
+   * @param output
+   */
+  public void addOutput(Output output) {
+    outputList.add(output);
+  }
+
+  /**
+   * @param metricsList
+   */
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    super.addMetricsContainers(metricsList);
+    if (firstFilter != null) {
+      firstFilter.addMetricsContainers(metricsList);
+    }
+    metricsList.add(readBytesMetric);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
new file mode 100644
index 0000000..420610a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputFile extends Input {
+  static private Logger logger = Logger.getLogger(InputFile.class);
+
+  // String startPosition = "beginning";
+  String logPath = null;
+  boolean isStartFromBegining = true;
+
+  boolean isReady = false;
+  File[] logPathFiles = null;
+  Object fileKey = null;
+  String base64FileKey = null;
+
+  private boolean isRolledOver = false;
+  boolean addWildCard = false;
+
+  long lastCheckPointTimeMS = 0;
+  int checkPointIntervalMS = 5 * 1000; // 5 seconds
+  RandomAccessFile checkPointWriter = null;
+  Map<String, Object> jsonCheckPoint = null;
+
+  File checkPointFile = null;
+
+  private InputMarker lastCheckPointInputMarker = null;
+
+  private String checkPointExtension = ".cp";
+
+  @Override
+  public void init() throws Exception {
+    logger.info("init() called");
+    statMetric.metricsName = "input.files.read_lines";
+    readBytesMetric.metricsName = "input.files.read_bytes";
+    checkPointExtension = LogFeederUtil.getStringProperty(
+      "logfeeder.checkpoint.extension", checkPointExtension);
+
+    // Let's close the file and set it to true after we start monitoring it
+    setClosed(true);
+    logPath = getStringValue("path");
+    tail = getBooleanValue("tail", tail);
+    addWildCard = getBooleanValue("add_wild_card", addWildCard);
+    checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
+      checkPointIntervalMS);
+
+    if (logPath == null || logPath.isEmpty()) {
+      logger.error("path is empty for file input. "
+        + getShortDescription());
+      return;
+    }
+
+    String startPosition = getStringValue("start_position");
+    if (StringUtils.isEmpty(startPosition)
+      || startPosition.equalsIgnoreCase("beginning")
+      || startPosition.equalsIgnoreCase("begining")) {
+      isStartFromBegining = true;
+    }
+
+    if (!tail) {
+      // start position end doesn't apply if we are not tailing
+      isStartFromBegining = true;
+    }
+
+    setFilePath(logPath);
+    boolean isFileReady = isReady();
+
+    logger.info("File to monitor " + logPath + ", tail=" + tail
+      + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
+
+    super.init();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#isReady()
+   */
+  @Override
+  public boolean isReady() {
+    if (!isReady) {
+      // Let's try to check whether the file is available
+      logPathFiles = getActualFiles(logPath);
+      if (logPathFiles != null && logPathFiles.length > 0
+        && logPathFiles[0].isFile()) {
+
+        if (isTail() && logPathFiles.length > 1) {
+          logger.warn("Found multiple files (" + logPathFiles.length
+            + ") for the file filter " + filePath
+            + ". Will use only the first one. Using "
+            + logPathFiles[0].getAbsolutePath());
+        }
+        logger.info("File filter " + filePath + " expanded to "
+          + logPathFiles[0].getAbsolutePath());
+        isReady = true;
+      } else {
+        logger.debug(logPath + " file doesn't exist. Ignoring for now");
+      }
+    }
+    return isReady;
+  }
+
+  private File[] getActualFiles(String searchPath) {
+    if (addWildCard) {
+      if (!searchPath.endsWith("*")) {
+        searchPath = searchPath + "*";
+      }
+    }
+    File checkFile = new File(searchPath);
+    if (checkFile.isFile()) {
+      return new File[]{checkFile};
+    }
+    // Let's do wild card search
+    // First check current folder
+    File checkFiles[] = findFileForWildCard(searchPath, new File("."));
+    if (checkFiles == null || checkFiles.length == 0) {
+      // Let's check from the parent folder
+      File parentDir = (new File(searchPath)).getParentFile();
+      if (parentDir != null) {
+        String wildCard = (new File(searchPath)).getName();
+        checkFiles = findFileForWildCard(wildCard, parentDir);
+      }
+    }
+    return checkFiles;
+  }
+
+  private File[] findFileForWildCard(String searchPath, File dir) {
+    logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir="
+      + dir + ", dir.fullpath=" + dir.getAbsolutePath());
+    FileFilter fileFilter = new WildcardFileFilter(searchPath);
+    return dir.listFiles(fileFilter);
+  }
+
+  @Override
+  synchronized public void checkIn(InputMarker inputMarker) {
+    super.checkIn(inputMarker);
+    if (checkPointWriter != null) {
+      try {
+        int lineNumber = LogFeederUtil.objectToInt(
+          jsonCheckPoint.get("line_number"), 0, "line_number");
+        if (lineNumber > inputMarker.lineNumber) {
+          // Already wrote higher line number for this input
+          return;
+        }
+        // If interval is greater than last checkPoint time, then write
+        long currMS = System.currentTimeMillis();
+        if (!isClosed()
+          && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+          // Let's save this one so we can update the check point file
+          // on flush
+          lastCheckPointInputMarker = inputMarker;
+          return;
+        }
+        lastCheckPointTimeMS = currMS;
+
+        jsonCheckPoint.put("line_number", ""
+          + new Integer(inputMarker.lineNumber));
+        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+        jsonCheckPoint.put("last_write_time_date", new Date());
+
+        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+        // Let's rewind
+        checkPointWriter.seek(0);
+        checkPointWriter.writeInt(jsonStr.length());
+        checkPointWriter.write(jsonStr.getBytes());
+
+        if (isClosed()) {
+          final String LOG_MESSAGE_KEY = this.getClass()
+            .getSimpleName() + "_FINAL_CHECKIN";
+          LogFeederUtil.logErrorMessageByInterval(
+            LOG_MESSAGE_KEY,
+            "Wrote final checkPoint, input="
+              + getShortDescription()
+              + ", checkPointFile="
+              + checkPointFile.getAbsolutePath()
+              + ", checkPoint=" + jsonStr, null, logger,
+            Level.INFO);
+        }
+      } catch (Throwable t) {
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_CHECKIN_EXCEPTION";
+        LogFeederUtil
+          .logErrorMessageByInterval(LOG_MESSAGE_KEY,
+            "Caught exception checkIn. , input="
+              + getShortDescription(), t, logger,
+            Level.ERROR);
+      }
+    }
+
+  }
+
+  @Override
+  public void checkIn() {
+    super.checkIn();
+    if (lastCheckPointInputMarker != null) {
+      checkIn(lastCheckPointInputMarker);
+    }
+  }
+
+  @Override
+  public void rollOver() {
+    logger.info("Marking this input file for rollover. "
+      + getShortDescription());
+    isRolledOver = true;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#monitor()
+   */
+  @Override
+  void start() throws Exception {
+    if (logPathFiles == null || logPathFiles.length == 0) {
+      return;
+    }
+
+    if (isTail()) {
+      // Just process the first file
+      processFile(logPathFiles[0]);
+    } else {
+      for (File file : logPathFiles) {
+        try {
+          processFile(file);
+          if (isClosed() || isDrain()) {
+            logger.info("isClosed or isDrain. Now breaking loop.");
+            break;
+          }
+        } catch (Throwable t) {
+          logger.error(
+            "Error processing file=" + file.getAbsolutePath(),
+            t);
+        }
+      }
+    }
+    // Call the close for the input. Which should flush to the filters and
+    // output
+    close();
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    logger.info("close() calling checkPoint checkIn(). "
+      + getShortDescription());
+    checkIn();
+  }
+
+  private void processFile(File logPathFile) throws FileNotFoundException,
+    IOException {
+    logger.info("Monitoring logPath=" + logPath + ", logPathFile="
+      + logPathFile);
+    BufferedReader br = null;
+    checkPointFile = null;
+    checkPointWriter = null;
+    jsonCheckPoint = null;
+    int resumeFromLineNumber = 0;
+
+    int lineCount = 0;
+    try {
+      setFilePath(logPathFile.getAbsolutePath());
+//      br = new BufferedReader(new FileReader(logPathFile));
+      br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile));
+
+      // Whether to send to output from the beginning.
+      boolean resume = isStartFromBegining;
+
+      // Seems FileWatch is not reliable, so let's only use file key
+      // comparison
+      // inputMgr.monitorSystemFileChanges(this);
+      fileKey = getFileKey(logPathFile);
+      base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
+        .getBytes());
+      logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey
+        + ". " + getShortDescription());
+
+      if (isTail()) {
+        try {
+          // Let's see if there is a checkpoint for this file
+          logger.info("Checking existing checkpoint file. "
+            + getShortDescription());
+
+          String fileBase64 = Base64.byteArrayToBase64(fileKey
+            .toString().getBytes());
+          String checkPointFileName = fileBase64
+            + checkPointExtension;
+          File checkPointFolder = inputMgr.getCheckPointFolderFile();
+          checkPointFile = new File(checkPointFolder,
+            checkPointFileName);
+          checkPointWriter = new RandomAccessFile(checkPointFile,
+            "rw");
+
+          try {
+            int contentSize = checkPointWriter.readInt();
+            byte b[] = new byte[contentSize];
+            int readSize = checkPointWriter.read(b, 0, contentSize);
+            if (readSize != contentSize) {
+              logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+                + contentSize
+                + ", read="
+                + readSize
+                + ", checkPointFile="
+                + checkPointFile
+                + ", input=" + getShortDescription());
+            } else {
+              // Create JSON string
+              String jsonCheckPointStr = new String(b, 0,
+                readSize);
+              jsonCheckPoint = LogFeederUtil
+                .toJSONObject(jsonCheckPointStr);
+
+              resumeFromLineNumber = LogFeederUtil.objectToInt(
+                jsonCheckPoint.get("line_number"), 0,
+                "line_number");
+
+              if (resumeFromLineNumber > 0) {
+                // Let's read from last line read
+                resume = false;
+              }
+              logger.info("CheckPoint. checkPointFile="
+                + checkPointFile + ", json="
+                + jsonCheckPointStr
+                + ", resumeFromLineNumber="
+                + resumeFromLineNumber + ", resume="
+                + resume);
+            }
+          } catch (EOFException eofEx) {
+            logger.info("EOFException. Will reset checkpoint file "
+              + checkPointFile.getAbsolutePath() + " for "
+              + getShortDescription());
+          }
+          if (jsonCheckPoint == null) {
+            // This seems to be first time, so creating the initial
+            // checkPoint object
+            jsonCheckPoint = new HashMap<String, Object>();
+            jsonCheckPoint.put("file_path", filePath);
+            jsonCheckPoint.put("file_key", fileBase64);
+          }
+
+        } catch (Throwable t) {
+          logger.error(
+            "Error while configuring checkpoint file. Will reset file. checkPointFile="
+              + checkPointFile, t);
+        }
+      }
+
+      setClosed(false);
+      int sleepStep = 2;
+      int sleepIteration = 0;
+      while (true) {
+        try {
+          if (isDrain()) {
+            break;
+          }
+
+          String line = br.readLine();
+          if (line == null) {
+            if (!resume) {
+              resume = true;
+            }
+            sleepIteration++;
+            try {
+              // Since FileWatch service is not reliable, we will
+              // check
+              // file inode every n seconds after no write
+              if (sleepIteration > 4) {
+                Object newFileKey = getFileKey(logPathFile);
+                if (newFileKey != null) {
+                  if (fileKey == null
+                    || !newFileKey.equals(fileKey)) {
+                    logger.info("File key is different. Calling rollover. oldKey="
+                      + fileKey
+                      + ", newKey="
+                      + newFileKey
+                      + ". "
+                      + getShortDescription());
+                    // File has rotated.
+                    rollOver();
+                  }
+                }
+              }
+              // Flush on the second iteration
+              if (!tail && sleepIteration >= 2) {
+                logger.info("End of file. Done with filePath="
+                  + logPathFile.getAbsolutePath()
+                  + ", lineCount=" + lineCount);
+                flush();
+                break;
+              } else if (sleepIteration == 2) {
+                flush();
+              } else if (sleepIteration >= 2) {
+                if (isRolledOver) {
+                  isRolledOver = false;
+                  // Close existing file
+                  try {
+                    logger.info("File is rolled over. Closing current open file."
+                      + getShortDescription()
+                      + ", lineCount=" + lineCount);
+                    br.close();
+                  } catch (Exception ex) {
+                    logger.error("Error closing file"
+                      + getShortDescription());
+                    break;
+                  }
+                  try {
+                    // Open new file
+                    logger.info("Opening new rolled over file."
+                      + getShortDescription());
+//                    br = new BufferedReader(new FileReader(
+//                            logPathFile));
+                    br = new BufferedReader(LogsearchReaderFactory.
+                      INSTANCE.getReader(logPathFile));
+                    lineCount = 0;
+                    fileKey = getFileKey(logPathFile);
+                    base64FileKey = Base64
+                      .byteArrayToBase64(fileKey
+                        .toString().getBytes());
+                    logger.info("fileKey=" + fileKey
+                      + ", base64=" + base64FileKey
+                      + ", " + getShortDescription());
+                  } catch (Exception ex) {
+                    logger.error("Error opening rolled over file. "
+                      + getShortDescription());
+                    // Let's add this to monitoring and exit
+                    // this
+                    // thread
+                    logger.info("Added input to not ready list."
+                      + getShortDescription());
+                    isReady = false;
+                    inputMgr.addToNotReady(this);
+                    break;
+                  }
+                  logger.info("File is successfully rolled over. "
+                    + getShortDescription());
+                  continue;
+                }
+              }
+              Thread.sleep(sleepStep * 1000);
+              sleepStep = (sleepStep * 2);
+              sleepStep = sleepStep > 10 ? 10 : sleepStep;
+            } catch (InterruptedException e) {
+              logger.info("Thread interrupted."
+                + getShortDescription());
+            }
+          } else {
+            lineCount++;
+            sleepStep = 1;
+            sleepIteration = 0;
+
+            if (!resume && lineCount > resumeFromLineNumber) {
+              logger.info("Resuming to read from last line. lineCount="
+                + lineCount
+                + ", input="
+                + getShortDescription());
+              resume = true;
+            }
+            if (resume) {
+              InputMarker marker = new InputMarker();
+              marker.fileKey = fileKey;
+              marker.base64FileKey = base64FileKey;
+              marker.filePath = filePath;
+              marker.input = this;
+              marker.lineNumber = lineCount;
+              outputLine(line, marker);
+            }
+          }
+        } catch (Throwable t) {
+          final String LOG_MESSAGE_KEY = this.getClass()
+            .getSimpleName() + "_READ_LOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+            "Caught exception in read loop. lineNumber="
+              + lineCount + ", input="
+              + getShortDescription(), t, logger,
+            Level.ERROR);
+
+        }
+      }
+    } finally {
+      if (br != null) {
+        logger.info("Closing reader." + getShortDescription()
+          + ", lineCount=" + lineCount);
+        try {
+          br.close();
+        } catch (Throwable t) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * @param logPathFile2
+   * @return
+   */
+  static public Object getFileKey(File file) {
+    try {
+      Path fileFullPath = Paths.get(file.getAbsolutePath());
+      if (fileFullPath != null) {
+        BasicFileAttributes basicAttr = Files.readAttributes(
+          fileFullPath, BasicFileAttributes.class);
+        return basicAttr.fileKey();
+      }
+    } catch (Throwable ex) {
+      logger.error("Error getting file attributes for file=" + file, ex);
+    }
+    return file.toString();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "input:source="
+      + getStringValue("source")
+      + ", path="
+      + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
+      .getAbsolutePath() : getStringValue("path"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
new file mode 100644
index 0000000..6196068
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+/**
+ * This file contains the file inode, line number of the log currently been read
+ */
+public class InputMarker {
+  public int lineNumber = 0;
+  public int beginLineNumber = 0;
+  public Input input;
+  public String filePath;
+  public Object fileKey = null;
+  public String base64FileKey = null;
+
+  @Override
+  public String toString() {
+    return "InputMarker [lineNumber=" + lineNumber + ", input="
+      + input.getShortDescription() + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
new file mode 100644
index 0000000..9c46c4e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.reader;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.log4j.Logger;
+
+public class GZIPReader extends InputStreamReader {
+
+  private static Logger logger = Logger.getLogger(GZIPReader.class);
+
+  public GZIPReader(String fileName) throws FileNotFoundException {
+    super(getStream(fileName));
+    logger.info("Created GZIPReader for file : " + fileName);
+  }
+
+  public GZIPReader(File file) throws FileNotFoundException {
+    super(getStream(file.getName()));
+  }
+
+  private static InputStream getStream(String fileName) {
+    InputStream gzipStream = null;
+    InputStream fileStream = null;
+    try {
+      fileStream = new FileInputStream(fileName);
+      gzipStream = new GZIPInputStream(fileStream);
+    } catch (Exception e) {
+      logger.error(e, e.getCause());
+    }
+    return gzipStream;
+  }
+
+  /**
+   * validating file based on magic number
+   *
+   * @param fileName
+   * @return
+   */
+  public static boolean isValidFile(String fileName) {
+    // TODO make it generic and put in factory itself
+    InputStream is = null;
+    try {
+      is = new FileInputStream(fileName);
+      byte[] signature = new byte[2];
+      int nread = is.read(signature); // read the gzip signature
+      return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b;
+    } catch (IOException e) {
+      return false;
+    } finally {
+      if (is != null) {
+        try {
+          is.close();
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
new file mode 100644
index 0000000..a231807
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.reader;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.Reader;
+
+import org.apache.log4j.Logger;
+
+public enum LogsearchReaderFactory {
+  INSTANCE;
+  private static Logger logger = Logger
+    .getLogger(LogsearchReaderFactory.class);
+
+  /**
+   * @param fileName
+   * @return
+   * @throws FileNotFoundException
+   */
+  public Reader getReader(File file) throws FileNotFoundException {
+    logger.debug("Inside reader factory for file:" + file);
+    if (GZIPReader.isValidFile(file.getAbsolutePath())) {
+      logger.info("Reading file " + file + " as gzip file");
+      return new GZIPReader(file.getAbsolutePath());
+    } else {
+      return new FileReader(file);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
new file mode 100644
index 0000000..fc12458
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.SolrUtil;
+import org.apache.ambari.logfeeder.view.VLogfeederFilter;
+import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;
+import org.apache.log4j.Logger;
+
+public class FetchConfigFromSolr extends Thread {
+  private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class);
+  private static VLogfeederFilterWrapper logfeederFilterWrapper = null;
+  private static int solrConfigInterval = 5;// 5 sec;
+  private static long delay;
+  private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z
+  private static String sysTimeZone = "GMT";
+
+  public FetchConfigFromSolr() {
+    this.setName(this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void run() {
+    solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.internal", solrConfigInterval);
+    delay = 1000 * solrConfigInterval;
+    do {
+      logger.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
+      pullConfigFromSolr();
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        logger.error(e.getLocalizedMessage(), e.getCause());
+      }
+    } while (true);
+  }
+
+  private synchronized void pullConfigFromSolr() {
+    HashMap<String, Object> configDocMap = SolrUtil.getInstance().getConfigDoc();
+    if (configDocMap != null) {
+      String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
+      if (configJson != null) {
+        logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class);
+      }
+    }
+  }
+
+  public static boolean isFilterExpired(VLogfeederFilter logfeederFilter) {
+    boolean isFilterExpired = false;// default is false
+    if (logfeederFilter != null) {
+      Date filterEndDate = parseFilterExpireDate(logfeederFilter);
+      if (filterEndDate != null) {
+        Date currentDate = getCurrentDate();
+        if (currentDate.compareTo(filterEndDate) >= 0) {
+          logger.debug("Filter for  Component :" + logfeederFilter.getLabel() + " and Hosts :"
+            + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : "
+            + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate));
+          isFilterExpired = true;
+        }
+      }
+    }
+    return isFilterExpired;
+  }
+
+  public static String dateToStr(Date date) {
+    if (date == null) {
+      return "";
+    }
+    SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
+    TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
+    formatter.setTimeZone(timeZone);
+    return formatter.format(date);
+  }
+
+  public static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) {
+    String expiryTime = vLogfeederFilter.getExpiryTime();
+    if (expiryTime != null && !expiryTime.isEmpty()) {
+      SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
+      TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
+      formatter.setTimeZone(timeZone);
+      try {
+        return formatter.parse(expiryTime);
+      } catch (ParseException e) {
+        logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
+          + " and hosts :" + listToStr(vLogfeederFilter.getHosts()));
+      }
+    }
+    return null;
+  }
+
+  public static List<String> getAllowedLevels(String hostName, VLogfeederFilter componentFilter) {
+    String componentName = componentFilter.getLabel();
+    List<String> hosts = componentFilter.getHosts();
+    List<String> defaultLevels = componentFilter.getDefaultLevels();
+    List<String> overrideLevels = componentFilter.getOverrideLevels();
+    if (LogFeederUtil.isListContains(hosts, hostName, false)) {
+      if (isFilterExpired(componentFilter)) {
+        // pick default
+        logger.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at "
+          + componentFilter.getExpiryTime());
+        return defaultLevels;
+      } else {
+        // return tmp filter levels
+        return overrideLevels;
+      }
+    } else {
+      return defaultLevels;
+    }
+  }
+
+  public static VLogfeederFilter findComponentFilter(String componentName) {
+    if (logfeederFilterWrapper != null) {
+      HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
+      if (filter != null) {
+        VLogfeederFilter componentFilter = filter.get(componentName);
+        if (componentFilter != null) {
+          return componentFilter;
+        }
+      }
+    }
+    logger.trace("Filter is not there for component :" + componentName);
+    return null;
+  }
+
+
+  public static Date getCurrentDate() {
+    TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone));
+    Date date = new Date();
+    return date;
+  }
+
+  public static String listToStr(List<String> strList) {
+    StringBuilder out = new StringBuilder("[");
+    if (strList != null) {
+      int counter = 0;
+      for (Object o : strList) {
+        if (counter > 0) {
+          out.append(",");
+        }
+        out.append(o.toString());
+        counter++;
+      }
+    }
+    out.append("]");
+    return out.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
new file mode 100644
index 0000000..f61dc1b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+public class LogFeederConstants {
+
+  public static final String ALL = "all";
+  public static final String NAME = "log_feeder_config";
+  // solr fields
+  public static final String SOLR_LEVEL = "level";
+  public static final String SOLR_COMPONENT = "type";
+  public static final String SOLR_HOST = "host";
+
+  //
+  // UserConfig Constants History
+  public static final String ID = "id";
+  public static final String USER_NAME = "username";
+  public static final String VALUES = "jsons";
+  public static final String FILTER_NAME = "filtername";
+  public static final String ROW_TYPE = "rowtype";
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
new file mode 100644
index 0000000..7525dff
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.log4j.Logger;
+
+public enum LogfeederScheduler {
+
+  INSTANCE;
+
+  private Logger logger = Logger.getLogger(LogfeederScheduler.class);
+
+  private static boolean running = false;
+
+  public synchronized void start() {
+    boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+    if (!filterEnable) {
+      logger.info("Logfeeder  filter Scheduler is disabled.");
+      return;
+    }
+    if (!running) {
+      for (Thread thread : getThreadList()) {
+        thread.start();
+      }
+      running = true;
+      logger.info("Logfeeder Scheduler started!");
+    } else {
+      logger.warn("Logfeeder Scheduler is already running.");
+    }
+  }
+
+  private List<Thread> getThreadList() {
+    List<Thread> tasks = new ArrayList<Thread>();
+    tasks.add(new FetchConfigFromSolr());
+    return tasks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
new file mode 100644
index 0000000..3748445
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.view.VLogfeederFilter;
+import org.apache.log4j.Logger;
+
+public class ApplyLogFilter extends DefaultDataFilter {
+
+  private static Logger logger = Logger.getLogger(ApplyLogFilter.class);
+
+  @Override
+  public boolean applyFilter(Map<String, Object> jsonObj, boolean defaultValue) {
+    if (isEmpty(jsonObj)) {
+      logger.warn("Output jsonobj is empty");
+      return defaultValue;
+    }
+    String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+    if (isNotEmpty(hostName)) {
+      String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+      if (isNotEmpty(componentName)) {
+        String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+        if (isNotEmpty(level)) {
+          // find component filter
+          VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName);
+          if (componentFilter == null) {
+            //return default value if there is no filter found for particular component
+            return defaultValue;
+          }
+          List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels(hostName, componentFilter);
+          return LogFeederUtil.isListContains(allowedLevels, level, false);
+        }
+      }
+    }
+    return defaultValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
new file mode 100644
index 0000000..9e98c6a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Default filter to allow everything
+ */
+public class DefaultDataFilter {
+  private static Logger logger = Logger.getLogger(DefaultDataFilter.class);
+
+  protected static final boolean CASE_SENSITIVE = false;
+
+  public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) {
+    return defaultValue;
+  }
+
+  public boolean isEmpty(Map<String, Object> map) {
+    if (map == null || map.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isEmpty(String str) {
+    if (str == null || str.trim().isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isNotEmpty(String str) {
+    return !isEmpty(str);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
new file mode 100644
index 0000000..643df98
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
+import org.apache.log4j.Logger;
+
+/**
+ * Read configuration from solr and filter the log
+ */
+public enum FilterLogData {
+  INSTANCE;
+  private ApplyLogFilter applyLogFilter = new ApplyLogFilter();
+  private static Logger logger = Logger.getLogger(FilterLogData.class);
+  // by default allow every log
+  boolean defaultValue = true;
+
+  public boolean isAllowed(String jsonBlock) {
+    if (jsonBlock == null || jsonBlock.isEmpty()) {
+      return defaultValue;
+    }
+    Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+    return applyLogFilter.applyFilter(jsonObj, defaultValue);
+  }
+
+  public boolean isAllowed(Map<String, Object> jsonObj) {
+    boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue);
+    if (!isAllowed) {
+      logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
+    }
+    return isAllowed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
new file mode 100644
index 0000000..5b89d4b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+public abstract class Mapper {
+  String inputDesc;
+  String fieldName;
+  String mapClassCode;
+
+  @SuppressWarnings("hiding")
+  public boolean init(String inputDesc, String fieldName,
+                      String mapClassCode, Object mapConfigs) {
+    this.inputDesc = inputDesc;
+    this.fieldName = fieldName;
+    this.mapClassCode = mapClassCode;
+    return true;
+  }
+
+  /**
+   * @param value
+   * @return
+   */
+  public Object apply(Map<String, Object> jsonObj, Object value) {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return "mapClass=" + mapClassCode + ", input=" + inputDesc
+      + ", fieldName=" + fieldName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
new file mode 100644
index 0000000..107e7e4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class MapperDate extends Mapper {
+  Logger logger = Logger.getLogger(MapperDate.class);
+
+  String dateFormat = null;
+  SimpleDateFormat dateFormatter = null;
+  boolean isEpoch = false;
+
+  @SuppressWarnings("hiding")
+  @Override
+  public boolean init(String inputDesc, String fieldName,
+                      String mapClassCode, Object mapConfigs) {
+    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+    if (!(mapConfigs instanceof Map)) {
+      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+        + mapConfigs.getClass().getName()
+        + ", map="
+        + this.toString());
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+    dateFormat = (String) mapObjects.get("date_pattern");
+    if (StringUtils.isEmpty(dateFormat)) {
+      logger.fatal("Date format for map is empty. " + this.toString());
+    } else {
+      logger.info("Date mapper format is " + dateFormat);
+
+      if (dateFormat.equalsIgnoreCase("epoch")) {
+        isEpoch = true;
+        return true;
+      } else {
+        try {
+          dateFormatter = new SimpleDateFormat(dateFormat);
+          return true;
+        } catch (Throwable ex) {
+          logger.fatal("Error creating date format. format="
+            + dateFormat + ". " + this.toString());
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Object apply(Map<String, Object> jsonObj, Object value) {
+    if (value != null) {
+      try {
+        if (isEpoch) {
+          // First convert to long
+          long ms = Long.parseLong(value.toString()) * 1000;
+          value = new Date(ms);
+        } else if (dateFormatter != null) {
+          value = dateFormatter.parse(value.toString());
+        } else {
+          return value;
+        }
+        jsonObj.put(fieldName, value);
+      } catch (Throwable t) {
+        LogFeederUtil.logErrorMessageByInterval(this.getClass()
+            .getSimpleName() + ":apply",
+          "Error applying date transformation. isEpoch="
+            + isEpoch + ", dateFormat=" + dateFormat
+            + ", value=" + value + ". " + this.toString(),
+          t, logger, Level.ERROR);
+      }
+    }
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
new file mode 100644
index 0000000..99c33ed
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Overrides the value for the field
+ */
+public class MapperFieldName extends Mapper {
+  Logger logger = Logger.getLogger(MapperFieldName.class);
+  String newValue = null;
+
+  @SuppressWarnings("hiding")
+  @Override
+  public boolean init(String inputDesc, String fieldName,
+      String mapClassCode, Object mapConfigs) {
+    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+    if (!(mapConfigs instanceof Map)) {
+      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+          + mapConfigs.getClass().getName());
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+    newValue = (String) mapObjects.get("new_fieldname");
+    if (StringUtils.isEmpty(newValue)) {
+      logger.fatal("Map field value is empty.");
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Object apply(Map<String, Object> jsonObj, Object value) {
+    if (newValue != null) {
+      // Remove the old one
+      jsonObj.remove(fieldName);
+      // Add with new key name
+      jsonObj.put(newValue, value);
+    } else {
+      LogFeederUtil.logErrorMessageByInterval(this.getClass()
+          .getSimpleName() + ":apply",
+          "New fieldName is null, so transformation is not applied. "
+              + this.toString(), null, logger, Level.ERROR);
+    }
+    return value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
new file mode 100644
index 0000000..9810ceb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Overrides the value for the field
+ */
+public class MapperFieldValue extends Mapper {
+  Logger logger = Logger.getLogger(MapperFieldValue.class);
+  String prevValue = null;
+  String newValue = null;
+
+  @SuppressWarnings("hiding")
+  @Override
+  public boolean init(String inputDesc, String fieldName,
+      String mapClassCode, Object mapConfigs) {
+    super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+    if (!(mapConfigs instanceof Map)) {
+      logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+          + mapConfigs.getClass().getName());
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+    prevValue = (String) mapObjects.get("pre_value");
+    newValue = (String) mapObjects.get("post_value");
+    if (StringUtils.isEmpty(newValue)) {
+      logger.fatal("Map field value is empty.");
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Object apply(Map<String, Object> jsonObj, Object value) {
+    if (newValue != null) {
+      if (prevValue != null) {
+        if (prevValue.equalsIgnoreCase(value.toString())) {
+          value = newValue;
+          jsonObj.put(fieldName, value);
+        }
+      }
+    } else {
+      LogFeederUtil.logErrorMessageByInterval(
+          this.getClass().getSimpleName() + ":apply",
+          "New value is null, so transformation is not applied. "
+              + this.toString(), null, logger, Level.ERROR);
+    }
+    return value;
+  }
+
+}