You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/01/20 09:03:42 UTC

[ambari] branch trunk updated: AMBARI-22818. Log Feeder: refactor - create plugin api

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3f26e5  AMBARI-22818. Log Feeder: refactor - create plugin api
a3f26e5 is described below

commit a3f26e5e35c10c834312430cd0626a38406e7459
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Wed Jan 10 20:46:21 2018 +0100

    AMBARI-22818. Log Feeder: refactor - create plugin api
---
 .../api/model/inputconfig/InputDescriptor.java     |   2 -
 .../inputconfig/impl/InputDescriptorImpl.java      |   9 -
 .../ambari-logsearch-logfeeder-plugin-api/pom.xml  |  76 +++++
 .../ambari/logfeeder/plugin/common}/AliasUtil.java |  51 +--
 .../ambari/logfeeder/plugin/common/ConfigItem.java | 191 +++++++++++
 .../plugin/common/LogFeederProperties.java}        |  26 +-
 .../logfeeder/plugin/common}/MetricData.java       |  17 +-
 .../ambari/logfeeder/plugin}/filter/Filter.java    | 152 +++++----
 .../logfeeder/plugin/filter}/mapper/Mapper.java    |  41 ++-
 .../ambari/logfeeder/plugin}/input/Input.java      | 353 ++++++++++-----------
 .../logfeeder/plugin/input/InputMarker.java}       |  33 +-
 .../logfeeder/plugin}/input/cache/LRUCache.java    |   2 +-
 .../logfeeder/plugin/manager/BlockManager.java}    |  35 +-
 .../logfeeder/plugin/manager/InputManager.java}    |  36 ++-
 .../logfeeder/plugin/manager/OutputManager.java}   |  37 +--
 .../ambari/logfeeder/plugin}/output/Output.java    | 123 ++++---
 .../ambari-logsearch-logfeeder/pom.xml             |  10 +
 .../ambari/logfeeder/common/ConfigBlock.java       | 153 ---------
 .../ambari/logfeeder/common/ConfigHandler.java     | 111 ++++---
 .../apache/ambari/logfeeder/common/ConfigItem.java | 103 ------
 .../logfeeder/common/LogEntryParseTester.java      |  51 ++-
 .../ambari/logfeeder/conf/ApplicationConfig.java   |  13 +-
 .../ambari/logfeeder/conf/LogEntryCacheConfig.java |   4 +-
 .../ambari/logfeeder/conf/LogFeederProps.java      |   3 +-
 .../logfeeder/conf/LogFeederSecurityConfig.java    |   2 +
 .../apache/ambari/logfeeder/filter/FilterGrok.java |  50 ++-
 .../apache/ambari/logfeeder/filter/FilterJSON.java |  24 +-
 .../ambari/logfeeder/filter/FilterKeyValue.java    |  38 ++-
 .../ambari/logfeeder/input/AbstractInputFile.java  | 329 -------------------
 .../logfeeder/input/InputConfigUploader.java       |  26 +-
 .../apache/ambari/logfeeder/input/InputFile.java   | 277 ++++++++++++++--
 .../{InputMarker.java => InputFileMarker.java}     |  46 ++-
 .../{InputManager.java => InputManagerImpl.java}   |  76 +++--
 .../apache/ambari/logfeeder/input/InputS3File.java |  61 ++--
 .../ambari/logfeeder/input/InputSimulate.java      |  81 ++---
 .../logfeeder/input/file/FileCheckInHelper.java    |  93 ++++++
 .../logfeeder/input/file/ProcessFileHelper.java    | 143 +++++++++
 .../input/file/ResumeLineNumberHelper.java         |  91 ++++++
 .../ambari/logfeeder/input/reader/GZIPReader.java  |   4 +-
 .../input/reader/LogsearchReaderFactory.java       |   4 +-
 .../loglevelfilter/LogLevelFilterHandler.java      |  30 +-
 .../ambari/logfeeder/mapper/MapperAnonymize.java   |  11 +-
 .../apache/ambari/logfeeder/mapper/MapperDate.java |  32 +-
 .../ambari/logfeeder/mapper/MapperFieldCopy.java   |   8 +-
 .../ambari/logfeeder/mapper/MapperFieldName.java   |  10 +-
 .../ambari/logfeeder/mapper/MapperFieldValue.java  |  10 +-
 .../ambari/logfeeder/metrics/MetricsManager.java   |   1 +
 .../ambari/logfeeder/metrics/StatsLogger.java      |   1 +
 .../apache/ambari/logfeeder/output/OutputData.java |   4 +-
 .../ambari/logfeeder/output/OutputDevNull.java     |  48 ++-
 .../apache/ambari/logfeeder/output/OutputFile.java |  55 +++-
 .../ambari/logfeeder/output/OutputHDFSFile.java    |  41 ++-
 .../ambari/logfeeder/output/OutputKafka.java       |  52 ++-
 .../ambari/logfeeder/output/OutputLineFilter.java  |   4 +-
 .../{OutputManager.java => OutputManagerImpl.java} |  69 ++--
 .../ambari/logfeeder/output/OutputS3File.java      |  34 +-
 .../apache/ambari/logfeeder/output/OutputSolr.java |  58 ++--
 .../logfeeder/output/S3OutputConfiguration.java    |  12 +-
 .../apache/ambari/logfeeder/output/S3Uploader.java |   1 -
 .../ambari/logfeeder/output/spool/LogSpooler.java  |  13 +-
 .../ambari/logfeeder/util/LogFeederUtil.java       |  37 +--
 .../ambari/logfeeder/filter/FilterGrokTest.java    |  26 +-
 .../ambari/logfeeder/filter/FilterJSONTest.java    |  18 +-
 .../logfeeder/filter/FilterKeyValueTest.java       |  20 +-
 .../ambari/logfeeder/input/InputFileTest.java      |  46 +--
 .../ambari/logfeeder/input/InputManagerTest.java   |  25 +-
 .../ambari/logfeeder/input/cache/LRUCacheTest.java |   1 +
 .../logfeeder/metrics/MetricsManagerTest.java      |   2 +
 .../ambari/logfeeder/output/OutputKafkaTest.java   |   6 +-
 .../logfeeder/output/OutputLineFilterTest.java     |   4 +-
 .../ambari/logfeeder/output/OutputManagerTest.java |  35 +-
 .../ambari/logfeeder/output/OutputS3FileTest.java  | 105 ------
 .../ambari/logfeeder/output/OutputSolrTest.java    | 183 -----------
 ambari-logsearch/pom.xml                           |   1 +
 74 files changed, 2044 insertions(+), 1936 deletions(-)

diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
index 2ad1fac..82e9504 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
@@ -49,6 +49,4 @@ public interface InputDescriptor {
   Long getCacheDedupInterval();
 
   Boolean isEnabled();
-
-  Map<String, Object> getAllProperties();
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
index 765bf83..2e3405d 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
@@ -292,13 +292,4 @@ public abstract class InputDescriptorImpl implements InputDescriptor {
   public void setIsEnabled(Boolean isEnabled) {
     this.isEnabled = isEnabled;
   }
-
-  @Override
-  public Map<String, Object> getAllProperties() {
-    return allProperties;
-  }
-
-  public void setAllProperties(Map<String, Object> allProperties) {
-    this.allProperties = allProperties;
-  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml
new file mode 100644
index 0000000..7510002
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-logsearch</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.0.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>ambari-logsearch-logfeeder-plugin-api</artifactId>
+  <packaging>jar</packaging>
+  <name>Ambari Logsearch Log Feeder Plugin Api</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-logsearch-config-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.20</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>18.0</version>
+    </dependency>
+  </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
similarity index 70%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
index 3c48aa2..521e0bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
@@ -16,26 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.util;
+package org.apache.ambari.logfeeder.plugin.common;
 
-import java.util.HashMap;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
 
 public class AliasUtil {
 
-  private static final Logger LOG = Logger.getLogger(AliasUtil.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AliasUtil.class);
 
   private static final String ALIAS_CONFIG_JSON = "alias_config.json";
   private static HashMap<String, Object> aliasMap = null;
 
   static {
-    aliasMap = FileUtil.getJsonFileContentFromClassPath(ALIAS_CONFIG_JSON);
+    aliasMap = getJsonFileContentFromClassPath(ALIAS_CONFIG_JSON);
   }
 
   public static enum AliasType {
@@ -48,10 +52,10 @@ public class AliasUtil {
 
   public static Object getClassInstance(String key, AliasType aliasType) {
     String classFullName = getClassFullName(key, aliasType);
-    
+
     Object instance = null;
     try {
-      instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+      instance = Class.forName(classFullName).getConstructor().newInstance();
     } catch (Exception exception) {
       LOG.error("Unsupported class = " + classFullName, exception.getCause());
     }
@@ -84,23 +88,23 @@ public class AliasUtil {
 
   private static String getClassFullName(String key, AliasType aliastype) {
     String className = null;// key as a default value;
-    
+
     HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
     String value = aliasInfo.get("klass");
-    if (!StringUtils.isEmpty(value)) {
+    if (value != null && !value.isEmpty()) {
       className = value;
       LOG.debug("Class name found for key :" + key + ", class name :" + className + " aliastype:" + aliastype.name());
     } else {
       LOG.debug("Class name not found for key :" + key + " aliastype:" + aliastype.name());
     }
-    
+
     return className;
   }
 
   @SuppressWarnings("unchecked")
   private static HashMap<String, String> getAliasInfo(String key, AliasType aliastype) {
-    HashMap<String, String> aliasInfo = new HashMap<String, String>();
-    
+    HashMap<String, String> aliasInfo = new HashMap<>();
+
     if (aliasMap != null) {
       String typeKey = aliastype.name().toLowerCase();
       HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
@@ -108,7 +112,18 @@ public class AliasUtil {
         aliasInfo = (HashMap<String, String>) typeJson.get(key);
       }
     }
-    
+
     return aliasInfo;
   }
+
+  public static HashMap<String, Object> getJsonFileContentFromClassPath(String fileName) {
+    ObjectMapper mapper = new ObjectMapper();
+    try (InputStream inputStream = AliasUtil.class.getClassLoader().getResourceAsStream(fileName)) {
+      return mapper.readValue(inputStream, new TypeReference<HashMap<String, Object>>() {});
+    } catch (IOException e) {
+      LOG.error("Error occurred during loading alias json file: {}", e);
+    }
+    return new HashMap<String, Object>();
+  }
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
new file mode 100644
index 0000000..f9d4a7a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.plugin.common;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implements Cloneable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigItem.class);
+
+  private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+  private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
+
+  public static Gson getGson() {
+    return gson;
+  }
+
+  private Map<String, Object> configs;
+  private Map<String, String> contextFields = new HashMap<>();
+  private boolean drain = false;
+  public MetricData statMetric = new MetricData(getStatMetricName(), false);
+
+  public abstract void init(PROP_TYPE logFeederProperties) throws Exception;
+
+  /**
+   * Used while logging. Keep it short and meaningful
+   */
+  public abstract String getShortDescription();
+
+  public abstract String getStatMetricName();
+
+  public abstract boolean logConfigs();
+
+  public void loadConfig(Map<String, Object> map) {
+    configs = cloneObject(map);
+
+    Map<String, String> nvList = getNVList("add_fields");
+    if (nvList != null) {
+      contextFields.putAll(nvList);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<String, String> getNVList(String key) {
+    return (Map<String, String>) configs.get(key);
+  }
+
+  public Map<String, Object> getConfigs() {
+    return configs;
+  }
+
+  public boolean isEnabled() {
+    return getBooleanValue("is_enabled", true);
+  }
+
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    metricsList.add(statMetric);
+  }
+
+  public void incrementStat(int count) {
+    statMetric.value += count;
+  }
+
+  public synchronized void logStat() {
+    logStatForMetric(statMetric, "Stat");
+  }
+
+  public void logStatForMetric(MetricData metric, String prefixStr) {
+    long currStat = metric.value;
+    long currMS = System.currentTimeMillis();
+    String postFix = ", key=" + getShortDescription();
+    if (currStat > metric.prevLogValue) {
+      LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
+        " secs, count=" + (currStat - metric.prevLogValue) + postFix);
+    }
+    metric.prevLogValue = currStat;
+    metric.prevLogTime = currMS;
+  }
+
+  public boolean isDrain() {
+    return drain;
+  }
+
+  public void setDrain(boolean drain) {
+    this.drain = drain;
+  }
+
+  public String getStringValue(String property) {
+    return getStringValue(property, null);
+  }
+
+  public String getStringValue(String property, String defaultValue) {
+    Object strValue = configs.getOrDefault(property, defaultValue);
+    if (strValue != null) {
+      return strValue.toString();
+    }
+    return null;
+  }
+
+  public Boolean getBooleanValue(String property) {
+    return getBooleanValue(property, false);
+  }
+
+  public Boolean getBooleanValue(String property, Boolean defaultValue) {
+    Object booleanValue = configs.getOrDefault(property, defaultValue);
+    if (booleanValue != null) {
+      if (booleanValue.getClass().isAssignableFrom(Boolean.class)) {
+        return (Boolean) booleanValue;
+      } else {
+        String strValue = booleanValue.toString();
+        return strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes");
+      }
+    }
+    return false;
+  }
+
+  public Long getLongValue(String property) {
+    return getLongValue(property, null);
+  }
+
+  public Long getLongValue(String property, Long defaultValue) {
+    Object longValue = configs.getOrDefault(property, defaultValue);
+    if (longValue != null) {
+      if (longValue.getClass().isAssignableFrom(Long.class)) {
+        return (Long) longValue;
+      } else {
+        return Long.parseLong(longValue.toString());
+      }
+    }
+    return null;
+  }
+
+  public Integer getIntValue(String property) {
+    return getIntValue(property, null);
+  }
+
+  public Integer getIntValue(String property, Integer defaultValue) {
+    Object intValue = configs.getOrDefault(property, defaultValue);
+    if (intValue != null) {
+      if (intValue.getClass().isAssignableFrom(Integer.class)) {
+        return (Integer) intValue;
+      } else {
+        return Integer.parseInt(intValue.toString());
+      }
+    }
+    return null;
+  }
+
+  private Map<String, Object> cloneObject(Map<String, Object> map) {
+    if (map == null) {
+      return null;
+    }
+    String jsonStr = gson.toJson(map);
+    Type type = new TypeToken<Map<String, Object>>() {}.getType();
+    return gson.fromJson(jsonStr, type);
+  }
+
+  private Object getValue(String property) {
+    return configs.get(property);
+  }
+
+  private Object getValue(String property, Object defaultValue) {
+    return configs.getOrDefault(property, defaultValue);
+  }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
similarity index 58%
copy from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
index 6767687..a1e5c12 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
@@ -16,25 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.plugin.common;
 
-package org.apache.ambari.logfeeder.input;
+import java.util.Properties;
 
 /**
- * This file contains the file inode, line number of the log currently been read
+ * Static application level configuration interface for Log Feeder
  */
-public class InputMarker {
-  public final Input input;
-  public final String base64FileKey;
-  public final int lineNumber;
-  
-  public InputMarker(Input input, String base64FileKey, int lineNumber) {
-    this.input = input;
-    this.base64FileKey = base64FileKey;
-    this.lineNumber = lineNumber;
-  }
-  
-  @Override
-  public String toString() {
-    return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
-  }
+public interface LogFeederProperties {
+
+  /**
+   * Get all key-value pairs from static application level Log Feeder configuration
+   */
+  Properties getProperties();
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
similarity index 81%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
index e7f5d37..933f131 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
@@ -16,11 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.ambari.logfeeder.metrics;
-
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+package org.apache.ambari.logfeeder.plugin.common;
 
 public class MetricData {
   public final String metricsName;
@@ -30,17 +26,12 @@ public class MetricData {
     this.metricsName = metricsName;
     this.isPointInTime = isPointInTime;
   }
-  
+
   public long value = 0;
   public long prevPublishValue = 0;
-  
+
   public long prevLogValue = 0;
   public long prevLogTime = System.currentTimeMillis();
-  
+
   public int publishCount = 0; // Number of times the metric was published so far
-  
-  @Override
-  public String toString() {
-    return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
-  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
similarity index 68%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
index a06b348..9bf69b9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
@@ -16,53 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.ambari.logfeeder.filter;
+package org.apache.ambari.logfeeder.plugin.filter;
+
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.log4j.Priority;
+public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends ConfigItem<PROP_TYPE> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Filter.class);
 
-public abstract class Filter extends ConfigItem {
-  protected FilterDescriptor filterDescriptor;
-  protected Input input;
+  private final Map<String, List<Mapper>> postFieldValueMappers = new HashMap<>();
+  private FilterDescriptor filterDescriptor;
+  private PROP_TYPE logFeederProperties;
   private Filter nextFilter = null;
+  private Input input;
   private OutputManager outputManager;
 
-  private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
-
-  public void loadConfig(FilterDescriptor filterDescriptor) {
+  public void loadConfigs(FilterDescriptor filterDescriptor, PROP_TYPE logFeederProperties, OutputManager outputManager) {
     this.filterDescriptor = filterDescriptor;
+    this.logFeederProperties = logFeederProperties;
+    this.outputManager = outputManager;
   }
 
   public FilterDescriptor getFilterDescriptor() {
     return filterDescriptor;
   }
 
-  @Override
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
+  public PROP_TYPE getLogFeederProperties() {
+    return logFeederProperties;
+  }
 
+  @Override
+  public void init(PROP_TYPE logFeederProperties) throws Exception {
     initializePostMapValues();
     if (nextFilter != null) {
-      nextFilter.init(logFeederProps);
+      nextFilter.init(logFeederProperties);
     }
   }
 
@@ -76,17 +80,13 @@ public abstract class Filter extends ConfigItem {
       for (PostMapValues pmv : values) {
         for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
           String mapClassCode = mapFieldDescriptor.getJsonName();
-          Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
+          Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasUtil.AliasType.MAPPER);
           if (mapper == null) {
             LOG.warn("Unknown mapper type: " + mapClassCode);
             continue;
           }
           if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) {
-            List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
-            if (fieldMapList == null) {
-              fieldMapList = new ArrayList<Mapper>();
-              postFieldValueMappers.put(fieldName, fieldMapList);
-            }
+            List<Mapper> fieldMapList = postFieldValueMappers.computeIfAbsent(fieldName, k -> new ArrayList<>());
             fieldMapList.add(mapper);
           }
         }
@@ -94,30 +94,10 @@ public abstract class Filter extends ConfigItem {
     }
   }
 
-  public void setOutputManager(OutputManager outputManager) {
-    this.outputManager = outputManager;
-  }
-
-  public Filter getNextFilter() {
-    return nextFilter;
-  }
-
-  public void setNextFilter(Filter nextFilter) {
-    this.nextFilter = nextFilter;
-  }
-
-  public Input getInput() {
-    return input;
-  }
-
-  public void setInput(Input input) {
-    this.input = input;
-  }
-
   /**
    * Deriving classes should implement this at the minimum
    */
-  public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException  {
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception {
     // TODO: There is no transformation for string types.
     if (nextFilter != null) {
       nextFilter.apply(inputStr, inputMarker);
@@ -126,7 +106,7 @@ public abstract class Filter extends ConfigItem {
     }
   }
 
-  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
     for (String fieldName : postFieldValueMappers.keySet()) {
       Object value = jsonObj.get(fieldName);
       if (value != null) {
@@ -142,48 +122,66 @@ public abstract class Filter extends ConfigItem {
     }
   }
 
-  public void close() {
-    if (nextFilter != null) {
-      nextFilter.close();
-    }
+  public void loadConfig(FilterDescriptor filterDescriptor) {
+    this.filterDescriptor = filterDescriptor;
   }
 
-  public void flush() {
+  public Filter getNextFilter() {
+    return nextFilter;
+  }
 
+  public void setNextFilter(Filter nextFilter) {
+    this.nextFilter = nextFilter;
   }
 
-  @Override
-  public void logStat() {
-    super.logStat();
+  public Input getInput() {
+    return input;
+  }
+
+  public void setInput(Input input) {
+    this.input = input;
+  }
+
+  public OutputManager getOutputManager() {
+    return outputManager;
+  }
+
+  public void setOutputManager(OutputManager outputManager) {
+    this.outputManager = outputManager;
+  }
+
+  public void flush() {
+    // empty
+  }
+
+  public void close() {
     if (nextFilter != null) {
-      nextFilter.logStat();
+      nextFilter.close();
     }
   }
 
   @Override
   public boolean isEnabled() {
-    return BooleanUtils.isNotFalse(filterDescriptor.isEnabled());
+    return filterDescriptor.isEnabled();
   }
 
   @Override
-  public String getShortDescription() {
-    return null;
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    super.addMetricsContainers(metricsList);
+    if (nextFilter != null) {
+      nextFilter.addMetricsContainers(metricsList);
+    }
   }
 
   @Override
-  public boolean logConfigs(Priority level) {
-    if (!super.logConfigs(level)) {
-      return false;
-    }
-    LOG.log(level, "input=" + input.getShortDescription());
+  public boolean logConfigs() {
+    LOG.info("filter=" + getShortDescription());
     return true;
   }
 
   @Override
-  public void addMetricsContainers(List<MetricData> metricsList) {
-    super.addMetricsContainers(metricsList);
-    if (nextFilter != null) {
-      nextFilter.addMetricsContainers(metricsList);
-    }
+  public String getStatMetricName() {
+    // no metrics yet
+    return null;
   }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
similarity index 63%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
index 5facf76..d52bc01 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
@@ -16,28 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.plugin.filter.mapper;
 
-package org.apache.ambari.logfeeder.mapper;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 
 import java.util.Map;
 
-import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+public abstract class Mapper<PROP_TYPE extends LogFeederProperties> {
+
+  private MapFieldDescriptor mapFieldDescriptor;
+  private PROP_TYPE logFeederProperties;
 
-public abstract class Mapper {
   private String inputDesc;
-  protected String fieldName;
+  private String fieldName;
   private String mapClassCode;
 
-  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
-
   protected void init(String inputDesc, String fieldName, String mapClassCode) {
     this.inputDesc = inputDesc;
     this.fieldName = fieldName;
     this.mapClassCode = mapClassCode;
   }
 
+  public void loadConfigs(MapFieldDescriptor mapFieldDescriptor, PROP_TYPE logFeederProperties) {
+    this.mapFieldDescriptor = mapFieldDescriptor;
+    this.logFeederProperties = logFeederProperties;
+  }
+
+  public MapFieldDescriptor getMapFieldDescriptor() {
+    return mapFieldDescriptor;
+  }
+
+  public PROP_TYPE getLogFeederProperties() {
+    return logFeederProperties;
+  }
+
+  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
+
   public abstract Object apply(Map<String, Object> jsonObj, Object value);
 
+  public String getInputDesc() {
+    return inputDesc;
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+
+  public String getMapClassCode() {
+    return mapClassCode;
+  }
+
   @Override
   public String toString() {
     return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
similarity index 53%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 7b9dcd4..e2158bc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -16,84 +16,106 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.ambari.logfeeder.input;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
-import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
+package org.apache.ambari.logfeeder.plugin.input;
+
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.log4j.Priority;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public abstract class Input extends ConfigItem implements Runnable {
-  
-  private static final boolean DEFAULT_TAIL = true;
-  private static final boolean DEFAULT_USE_EVENT_MD5 = false;
-  private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
-  protected InputDescriptor inputDescriptor;
-  
-  protected InputManager inputManager;
-  protected OutputManager outputManager;
-  private List<Output> outputList = new ArrayList<>();
+public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements Runnable {
 
-  private Thread thread;
-  private String type;
-  protected String filePath;
-  private Filter firstFilter;
-  protected boolean isClosed;
-
-  protected boolean tail;
-  private boolean useEventMD5;
-  private boolean genEventMD5;
+  private static final Logger LOG = LoggerFactory.getLogger(Input.class);
 
+  private InputDescriptor inputDescriptor;
+  private PROP_TYPE logFeederProperties;
+  private LogSearchConfigLogFeeder logSearchConfig;
+  private InputManager inputManager;
+  private OutputManager outputManager;
+  private final List<Output> outputList = new ArrayList<>();
+  private Filter<PROP_TYPE> firstFilter;
+  private boolean isClosed;
+  private String type;
+  private boolean useEventMD5 = false;
+  private boolean genEventMD5 = true;
+  private Thread thread;
   private LRUCache cache;
   private String cacheKeyField;
-
   protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
-  protected String getReadBytesMetricName() {
-    return null;
-  }
-  
-  public void loadConfig(InputDescriptor inputDescriptor) {
+
+  public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE logFeederProperties,
+                          InputManager inputManager, OutputManager outputManager) {
     this.inputDescriptor = inputDescriptor;
+    this.logFeederProperties = logFeederProperties;
+    this.inputManager = inputManager;
+    this.outputManager = outputManager;
+  }
+
+  public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
+    this.logSearchConfig = logSearchConfig;
+  }
+
+  public LogSearchConfigLogFeeder getLogSearchConfig() {
+    return logSearchConfig;
+  }
+
+  public abstract boolean monitor();
+
+  public abstract List<? extends Input> getChildInputs();
+
+  public abstract INPUT_MARKER getInputMarker();
+
+  public abstract boolean isReady();
+
+  public abstract void setReady(boolean isReady);
+
+  public abstract void checkIn(INPUT_MARKER inputMarker);
+
+  public abstract void lastCheckIn();
+
+  public abstract String getReadBytesMetricName();
+
+  public PROP_TYPE getLogFeederProperties() {
+    return logFeederProperties;
   }
 
   public InputDescriptor getInputDescriptor() {
     return inputDescriptor;
   }
 
-  public void setType(String type) {
-    this.type = type;
+  public InputManager getInputManager() {
+    return inputManager;
   }
 
-  public void setInputManager(InputManager inputManager) {
-    this.inputManager = inputManager;
+  public OutputManager getOutputManager() {
+    return outputManager;
   }
 
   public void setOutputManager(OutputManager outputManager) {
     this.outputManager = outputManager;
   }
 
-  public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
-    Conditions conditions = filterDescriptor.getConditions();
-    Fields fields = conditions.getFields();
-    return fields.getType().contains(inputDescriptor.getType());
+  public void setInputManager(InputManager inputManager) {
+    this.inputManager = inputManager;
+  }
+
+  public void addOutput(Output output) {
+    outputList.add(output);
   }
 
   public void addFilter(Filter filter) {
@@ -108,53 +130,40 @@ public abstract class Input extends ConfigItem implements Runnable {
     }
   }
 
+  public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
+    Conditions conditions = filterDescriptor.getConditions();
+    Fields fields = conditions.getFields();
+    return fields.getType().contains(inputDescriptor.getType());
+  }
+
   @SuppressWarnings("unchecked")
   public boolean isOutputRequired(Output output) {
     Map<String, Object> conditions = (Map<String, Object>) output.getConfigs().get("conditions");
     if (conditions == null) {
       return false;
     }
-    
+
     Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
     if (fields == null) {
       return false;
     }
-    
+
     List<String> types = (List<String>) fields.get("rowtype");
     return types.contains(inputDescriptor.getRowtype());
   }
 
-  public void addOutput(Output output) {
-    outputList.add(output);
+  @Override
+  public boolean isEnabled() {
+    return inputDescriptor.isEnabled();
   }
 
   @Override
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
-    initCache(logFeederProps.getLogEntryCacheConfig());
-    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
-    useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
-    genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
-
+  public void init(PROP_TYPE logFeederProperties) throws Exception {
     if (firstFilter != null) {
-      firstFilter.init(logFeederProps);
-    }
-
-  }
-
-  boolean monitor() {
-    if (isReady()) {
-      LOG.info("Starting thread. " + getShortDescription());
-      thread = new Thread(this, getNameForThread());
-      thread.start();
-      return true;
-    } else {
-      return false;
+      firstFilter.init(logFeederProperties);
     }
   }
 
-  public abstract boolean isReady();
-
   @Override
   public void run() {
     try {
@@ -171,17 +180,17 @@ public abstract class Input extends ConfigItem implements Runnable {
    * method should only exit after all data are read from the source or the
    * process is exiting
    */
-  abstract void start() throws Exception;
+  public abstract void start() throws Exception;
 
-  public void outputLine(String line, InputMarker marker) {
+  public void outputLine(String line, INPUT_MARKER marker) {
     statMetric.value++;
     readBytesMetric.value += (line.length());
 
     if (firstFilter != null) {
       try {
         firstFilter.apply(line, marker);
-      } catch (LogFeederException e) {
-        LOG.error(e.getLocalizedMessage(), e);
+      } catch (Exception e) {
+        LOG.error("Error during filter apply: {}", e);
       }
     } else {
       // TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter
@@ -189,48 +198,8 @@ public abstract class Input extends ConfigItem implements Runnable {
     }
   }
 
-  protected void flush() {
-    if (firstFilter != null) {
-      firstFilter.flush();
-    }
-  }
-
-  @Override
-  public void setDrain(boolean drain) {
-    LOG.info("Request to drain. " + getShortDescription());
-    super.setDrain(drain);
-    try {
-      thread.interrupt();
-    } catch (Throwable t) {
-      // ignore
-    }
-  }
-
-  public void addMetricsContainers(List<MetricData> metricsList) {
-    super.addMetricsContainers(metricsList);
-    if (firstFilter != null) {
-      firstFilter.addMetricsContainers(metricsList);
-    }
-    metricsList.add(readBytesMetric);
-  }
-
-  @Override
-  public void logStat() {
-    super.logStat();
-    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
-
-    if (firstFilter != null) {
-      firstFilter.logStat();
-    }
-  }
-
-  public abstract void checkIn(InputMarker inputMarker);
-
-  public abstract void lastCheckIn();
-
   public void close() {
     LOG.info("Close called. " + getShortDescription());
-
     try {
       if (firstFilter != null) {
         firstFilter.close();
@@ -240,31 +209,34 @@ public abstract class Input extends ConfigItem implements Runnable {
     }
   }
 
-  private void initCache(LogEntryCacheConfig cacheConfig) {
-    boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
-      ? inputDescriptor.isCacheEnabled()
-      : cacheConfig.isCacheEnabled();
-    if (cacheEnabled) {
-      String cacheKeyField = inputDescriptor.getCacheKeyField() != null
-        ? inputDescriptor.getCacheKeyField()
-        : cacheConfig.getCacheKeyField();
+  public void flush() {
+    if (firstFilter != null) {
+      firstFilter.flush();
+    }
+  }
+
+  public void loadConfig(InputDescriptor inputDescriptor) {
+    this.inputDescriptor = inputDescriptor;
+  }
 
-      setCacheKeyField(cacheKeyField);
+  public void setClosed(boolean isClosed) {
+    this.isClosed = isClosed;
+  }
 
-      int cacheSize = inputDescriptor.getCacheSize() != null
-        ? inputDescriptor.getCacheSize()
-        : cacheConfig.getCacheSize();
+  public boolean isClosed() {
+    return isClosed;
+  }
 
-      boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
-        ? inputDescriptor.getCacheLastDedupEnabled()
-        : cacheConfig.isCacheLastDedupEnabled();
+  public String getNameForThread() {
+    return this.getClass().getSimpleName();
+  }
 
-      long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
-        ? inputDescriptor.getCacheDedupInterval()
-        : Long.parseLong(cacheConfig.getCacheDedupInterval());
+  public String getType() {
+    return type;
+  }
 
-      setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
-    }
+  public void setType(String type) {
+    this.type = type;
   }
 
   public boolean isUseEventMD5() {
@@ -276,78 +248,93 @@ public abstract class Input extends ConfigItem implements Runnable {
   }
 
   public Filter getFirstFilter() {
-    return firstFilter;
+    return this.firstFilter;
   }
 
-  public String getFilePath() {
-    return filePath;
+  public Thread getThread() {
+    return thread;
   }
 
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
+  public void setThread(Thread thread) {
+    this.thread = thread;
   }
 
-  public void setClosed(boolean isClosed) {
-    this.isClosed = isClosed;
+  public void setUseEventMD5(boolean useEventMD5) {
+    this.useEventMD5 = useEventMD5;
   }
 
-  public boolean isClosed() {
-    return isClosed;
+  public void setGenEventMD5(boolean genEventMD5) {
+    this.genEventMD5 = genEventMD5;
   }
 
-  public List<Output> getOutputList() {
-    return outputList;
-  }
-  
-  public Thread getThread(){
-    return thread;
+  public LRUCache getCache() {
+    return this.cache;
   }
 
-  public LRUCache getCache() {
-    return cache;
+  public String getCacheKeyField() {
+    return this.cacheKeyField;
   }
 
   public void setCache(LRUCache cache) {
     this.cache = cache;
   }
 
-  public String getCacheKeyField() {
-    return cacheKeyField;
-  }
-
   public void setCacheKeyField(String cacheKeyField) {
     this.cacheKeyField = cacheKeyField;
   }
 
-  @Override
-  public boolean isEnabled() {
-    return BooleanUtils.isNotFalse(inputDescriptor.isEnabled());
+  public List<? extends Output> getOutputList() {
+    return outputList;
   }
 
-  @Override
-  public String getNameForThread() {
-    if (filePath != null) {
-      try {
-        return (type + "=" + (new File(filePath)).getName());
-      } catch (Throwable ex) {
-        LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
-      }
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    super.logStat();
+    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+    if (firstFilter != null) {
+      firstFilter.logStat();
     }
-    return super.getNameForThread() + ":" + type;
   }
 
-  @Override
-  public boolean logConfigs(Priority level) {
-    if (!super.logConfigs(level)) {
-      return false;
+  public void logStat() {
+    super.logStat();
+    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+    if (firstFilter != null) {
+      firstFilter.logStat();
+    }
+  }
+
+  public void initCache(boolean cacheEnabled, String cacheKeyField, int cacheSize,
+                        boolean cacheLastDedupEnabled, String cacheDedupInterval, String fileName) {
+    boolean enabled = getInputDescriptor().isCacheEnabled() != null
+      ? getInputDescriptor().isCacheEnabled()
+      : cacheEnabled;
+    if (enabled) {
+      String keyField = getInputDescriptor().getCacheKeyField() != null
+        ? getInputDescriptor().getCacheKeyField()
+        : cacheKeyField;
+
+      setCacheKeyField(keyField);
+
+      int size = getInputDescriptor().getCacheSize() != null
+        ? getInputDescriptor().getCacheSize()
+        : cacheSize;
+
+      boolean lastDedupEnabled = getInputDescriptor().getCacheLastDedupEnabled() != null
+        ? getInputDescriptor().getCacheLastDedupEnabled()
+        : cacheLastDedupEnabled;
+
+      long dedupInterval = getInputDescriptor().getCacheDedupInterval() != null
+        ? getInputDescriptor().getCacheDedupInterval()
+        : Long.parseLong(cacheDedupInterval);
+
+      setCache(new LRUCache(size, fileName, dedupInterval, lastDedupEnabled));
     }
-    LOG.log(level, "Printing Input=" + getShortDescription());
-    LOG.log(level, "description=" + inputDescriptor.getPath());
-    return true;
   }
 
   @Override
   public String toString() {
     return getShortDescription();
   }
-}
\ No newline at end of file
+}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
similarity index 64%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
index 2ad1fac..aa54019 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
@@ -16,39 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+package org.apache.ambari.logfeeder.plugin.input;
 
 import java.util.Map;
 
-public interface InputDescriptor {
-  String getType();
-
-  String getRowtype();
-
-  String getPath();
-
-  Map<String, String> getAddFields();
-
-  String getSource();
-
-  Boolean isTail();
-
-  Boolean isGenEventMd5();
+public interface InputMarker <INPUT_TYPE extends Input> {
 
-  Boolean isUseEventMd5AsId();
-
-  Boolean isCacheEnabled();
-
-  String getCacheKeyField();
-
-  Boolean getCacheLastDedupEnabled();
-
-  Integer getCacheSize();
-
-  Long getCacheDedupInterval();
-
-  Boolean isEnabled();
+  INPUT_TYPE getInput();
 
   Map<String, Object> getAllProperties();
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
similarity index 98%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
index d9cfef8..e0509fe 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.input.cache;
+package org.apache.ambari.logfeeder.plugin.input.cache;
 
 import com.google.common.collect.EvictingQueue;
 
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
similarity index 59%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
index 2ad1fac..674f51f 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
@@ -16,39 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.plugin.manager;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
 
-import java.util.Map;
+import java.util.List;
 
-public interface InputDescriptor {
-  String getType();
+public interface BlockManager {
 
-  String getRowtype();
+  void init() throws Exception;
 
-  String getPath();
+  void close();
 
-  Map<String, String> getAddFields();
+  void logStats();
 
-  String getSource();
+  void addMetricsContainers(List<MetricData> metricsList);
 
-  Boolean isTail();
-
-  Boolean isGenEventMd5();
-
-  Boolean isUseEventMd5AsId();
-
-  Boolean isCacheEnabled();
-
-  String getCacheKeyField();
-
-  Boolean getCacheLastDedupEnabled();
-
-  Integer getCacheSize();
-
-  Long getCacheDedupInterval();
-
-  Boolean isEnabled();
-
-  Map<String, Object> getAllProperties();
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
similarity index 55%
copy from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
index fa4e17b..0734158 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
@@ -16,27 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.output;
+package org.apache.ambari.logfeeder.plugin.manager;
+
+import org.apache.ambari.logfeeder.plugin.input.Input;
 
 import java.io.File;
+import java.util.List;
 
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.log4j.Logger;
 
-/**
- * Output that just ignore the logs
- */
-public class OutputDevNull extends Output {
+public abstract class InputManager implements BlockManager {
+
+  public abstract void addToNotReady(Input input);
+
+  public abstract void checkInAll();
+
+  public abstract List<Input> getInputList(String serviceName);
+
+  public abstract void add(String serviceName, Input input);
+
+  public abstract void removeInput(Input input);
+
+  public abstract File getCheckPointFolderFile();
 
-  private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
+  public abstract void cleanCheckPointFiles();
 
-  @Override
-  public void write(String block, InputMarker inputMarker){
-    LOG.trace("Ignore log block: " + block);
-  }
+  public abstract void removeInputsForService(String serviceName);
 
-  @Override
-  public void copyFile(File inputFile, InputMarker inputMarker) {
-    throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
-  }
+  public abstract void startInputs(String serviceName);
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
similarity index 53%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
index 2ad1fac..3a3c601 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
@@ -16,39 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.plugin.manager;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
 
+import java.io.File;
+import java.util.List;
 import java.util.Map;
 
-public interface InputDescriptor {
-  String getType();
+public abstract class OutputManager implements BlockManager {
 
-  String getRowtype();
+  public abstract void write(Map<String, Object> jsonObj, InputMarker inputMarker);
 
-  String getPath();
+  public abstract void write(String jsonBlock, InputMarker inputMarker);
 
-  Map<String, String> getAddFields();
+  public abstract void copyFile(File file, InputMarker marker);
 
-  String getSource();
+  public abstract void add(Output output);
 
-  Boolean isTail();
+  public abstract List<Output> getOutputs();
 
-  Boolean isGenEventMd5();
+  public abstract List<? extends OutputConfigMonitor> getOutputsToMonitor();
 
-  Boolean isUseEventMd5AsId();
-
-  Boolean isCacheEnabled();
-
-  String getCacheKeyField();
-
-  Boolean getCacheLastDedupEnabled();
-
-  Integer getCacheSize();
-
-  Long getCacheDedupInterval();
-
-  Boolean isEnabled();
-
-  Map<String, Object> getAllProperties();
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
similarity index 59%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
index c0075b5..a2f13b8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
@@ -16,84 +16,62 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.ambari.logfeeder.output;
+package org.apache.ambari.logfeeder.plugin.output;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.ambari.logfeeder.common.ConfigBlock;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements OutputConfigMonitor {
 
-public abstract class Output extends ConfigBlock implements OutputConfigMonitor {
-  private String destination = null;
+  private static final Logger LOG = LoggerFactory.getLogger(Output.class);
+
+  private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+  private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
 
+  private LogSearchConfigLogFeeder logSearchConfig;
+  private String destination = null;
+  private boolean isClosed;
   protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
-  protected String getWriteBytesMetricName() {
-    return null;
-  }
 
-  public boolean monitorConfigChanges() {
-    return false;
-  };
-  
-  @Override
-  public String getOutputType() {
-    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
-  }
-  
-  @Override
-  public void outputConfigChanged(OutputProperties outputProperties) {
-    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
-  };
+  public abstract String getOutputType();
 
-  @Override
-  public String getShortDescription() {
-    return null;
-  }
+  public abstract void outputConfigChanged(OutputProperties outputProperties);
 
-  @Override
-  public String getNameForThread() {
-    if (destination != null) {
-      return destination;
-    }
-    return super.getNameForThread();
-  }
+  public abstract void copyFile(File inputFile, InputMarker inputMarker) throws Exception;
 
-  public abstract void write(String block, InputMarker inputMarker) throws Exception;
-  
-  public abstract void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException;
+  public abstract void write(String jsonStr, INPUT_MARKER inputMarker) throws Exception;
 
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
-    write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
-  }
+  public abstract Long getPendingCount();
 
-  boolean isClosed = false;
+  public abstract String getWriteBytesMetricName();
 
-  /**
-   * Extend this method to clean up
-   */
-  public void close() {
-    LOG.info("Calling base close()." + getShortDescription());
-    isClosed = true;
+  public String getNameForThread() {
+    return this.getClass().getSimpleName();
   }
 
-  /**
-   * This is called on shutdown. All output should extend it.
-   */
-  public boolean isClosed() {
-    return isClosed;
+  public boolean monitorConfigChanges() {
+    return false;
+  };
+
+  public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
+    this.logSearchConfig = logSearchConfig;
   }
 
-  public long getPendingCount() {
-    return 0;
+  public LogSearchConfigLogFeeder getLogSearchConfig() {
+    return logSearchConfig;
   }
 
   public String getDestination() {
@@ -104,10 +82,16 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
     this.destination = destination;
   }
 
-  protected LogSearchConfigLogFeeder logSearchConfig;
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  public void setClosed(boolean closed) {
+    isClosed = closed;
+  }
 
-  public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
-    this.logSearchConfig = logSearchConfig;
+  public void write(Map<String, Object> jsonObj, INPUT_MARKER inputMarker) throws Exception {
+    write(gson.toJson(jsonObj), inputMarker);
   }
 
   @Override
@@ -121,10 +105,16 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
     super.logStat();
     logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
   }
-  
+
+  @Override
+  public boolean logConfigs() {
+    // TODO: log something about the configs
+    return true;
+  }
+
   public void trimStrValue(Map<String, Object> jsonObj) {
     if (jsonObj != null) {
-      for (Entry<String, Object> entry : jsonObj.entrySet()) {
+      for (Map.Entry<String, Object> entry : jsonObj.entrySet()) {
         String key = entry.getKey();
         Object value = entry.getValue();
         if (value != null && value instanceof String) {
@@ -134,4 +124,9 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
       }
     }
   }
+
+  public void close() {
+    LOG.info("Calling base close()." + getShortDescription());
+    isClosed = true;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 6a3524d..c116378 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -49,6 +49,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-logsearch-logfeeder-plugin-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
       <version>1.3.1</version>
@@ -176,6 +181,11 @@
       <version>1</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.6</version>
+    </dependency>
+    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
       <version>${spring-boot.version}</version>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
deleted file mode 100644
index cfcc199..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.common;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Priority;
-
-
-public abstract class ConfigBlock extends ConfigItem {
-  protected Map<String, Object> configs;
-  protected Map<String, String> contextFields = new HashMap<String, String>();
-  public ConfigBlock() {
-  }
-
-  public void loadConfig(Map<String, Object> map) {
-    configs = LogFeederUtil.cloneObject(map);
-
-    Map<String, String> nvList = getNVList("add_fields");
-    if (nvList != null) {
-      contextFields.putAll(nvList);
-    }
-  }
-
-  public Map<String, Object> getConfigs() {
-    return configs;
-  }
-
-  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
-    boolean allow = false;
-    String fieldValue = (String) configs.get(fieldName);
-    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
-      allow = true;
-    } else {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
-      if (addFields != null && addFields.get(fieldName) != null) {
-        String addFieldValue = (String) addFields.get(fieldName);
-        if (stringValue.equalsIgnoreCase(addFieldValue)) {
-          allow = true;
-        }
-      }
-
-    }
-    return allow;
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<String, String> getNVList(String key) {
-    return (Map<String, String>) configs.get(key);
-  }
-
-  public String getStringValue(String key) {
-    Object value = configs.get(key);
-    if (value != null && value.toString().equalsIgnoreCase("none")) {
-      value = null;
-    }
-    if (value != null) {
-      return value.toString();
-    }
-    return null;
-  }
-
-  public String getStringValue(String key, String defaultValue) {
-    Object value = configs.get(key);
-    if (value != null && value.toString().equalsIgnoreCase("none")) {
-      value = null;
-    }
-
-    if (value != null) {
-      return value.toString();
-    }
-    return defaultValue;
-  }
-
-  public Object getConfigValue(String key) {
-    return configs.get(key);
-  }
-
-  public boolean getBooleanValue(String key, boolean defaultValue) {
-    String strValue = getStringValue(key);
-    boolean retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      retValue = (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes"));
-    }
-    return retValue;
-  }
-
-  public int getIntValue(String key, int defaultValue) {
-    String strValue = getStringValue(key);
-    int retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      try {
-        retValue = Integer.parseInt(strValue);
-      } catch (Throwable t) {
-        LOG.error("Error parsing integer value. key=" + key + ", value=" + strValue);
-      }
-    }
-    return retValue;
-  }
-  
-  public long getLongValue(String key, long defaultValue) {
-    String strValue = getStringValue(key);
-    Long retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      try {
-        retValue = Long.parseLong(strValue);
-      } catch (Throwable t) {
-        LOG.error("Error parsing long value. key=" + key + ", value=" + strValue);
-      }
-    }
-    return retValue;
-  }
-
-  @Override
-  public boolean isEnabled() {
-    return getBooleanValue("is_enabled", true);
-  }
-
-  public Map<String, String> getContextFields() {
-    return contextFields;
-  }
-
-  public boolean logConfigs(Priority level) {
-    if (!super.logConfigs(level)) {
-      return false;
-    }
-    LOG.log(level, "Printing configuration Block=" + getShortDescription());
-    LOG.log(level, "configs=" + configs);
-    LOG.log(level, "contextFields=" + contextFields);
-    return true;
-  }
-}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 037f4a1..f138c13 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -16,39 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.common;
 
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.google.common.collect.Maps;
+import com.google.gson.reflect.TypeToken;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputManager;
 import org.apache.ambari.logfeeder.input.InputSimulate;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -58,18 +39,32 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.log4j.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
-
-import com.google.gson.reflect.TypeToken;
 import org.springframework.core.io.ClassPathResource;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class ConfigHandler implements InputConfigMonitor {
-  private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
+  private static final Logger LOG = Logger.getLogger(org.apache.ambari.logfeeder.common.ConfigHandler.class);
 
   private final LogSearchConfigLogFeeder logSearchConfig;
 
@@ -86,9 +81,9 @@ public class ConfigHandler implements InputConfigMonitor {
   private final List<InputDescriptor> inputConfigList = new ArrayList<>();
   private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
   private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
-  
+
   private boolean simulateMode = false;
-  
+
   public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
     this.logSearchConfig = logSearchConfig;
   }
@@ -99,13 +94,13 @@ public class ConfigHandler implements InputConfigMonitor {
     logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName());
     loadOutputs();
     simulateIfNeeded();
-    
+
     inputManager.init();
     outputManager.init();
-    
+
     logSearchConfig.monitorOutputProperties(outputManager.getOutputsToMonitor());
   }
-  
+
   private void loadConfigFiles() throws Exception {
     List<String> configFiles = getConfigFiles();
     for (String configFileName : configFiles) {
@@ -125,13 +120,13 @@ public class ConfigHandler implements InputConfigMonitor {
 
   private List<String> getConfigFiles() {
     List<String> configFiles = new ArrayList<>();
-    
+
     String logFeederConfigFilesProperty = logFeederProps.getConfigFiles();
     LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
     if (logFeederConfigFilesProperty != null) {
       configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
     }
-    
+
     return configFiles;
   }
 
@@ -155,22 +150,22 @@ public class ConfigHandler implements InputConfigMonitor {
       loadConfigs(configData);
     }
   }
-  
+
   @Override
   public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
     inputConfigList.clear();
     filterConfigList.clear();
-    
+
     inputConfigList.addAll(inputConfig.getInput());
     filterConfigList.addAll(inputConfig.getFilter());
-    
+
     if (simulateMode) {
       InputSimulate.loadTypeToFilePath(inputConfigList);
     } else {
       loadInputs(serviceName);
       loadFilters(serviceName);
       assignOutputsToInputs(serviceName);
-      
+
       inputManager.startInputs(serviceName);
     }
   }
@@ -190,7 +185,7 @@ public class ConfigHandler implements InputConfigMonitor {
     if (inputConfigList.isEmpty()) {
       throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton");
     }
-    
+
     for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
       if ("grok".equals(filterDescriptor.getFilter())) {
         // Thus ensure that the log entry passed will be parsed immediately
@@ -201,7 +196,7 @@ public class ConfigHandler implements InputConfigMonitor {
     loadInputs("test");
     loadFilters("test");
     List<Input> inputList = inputManager.getInputList("test");
-    
+
     return inputList != null && inputList.size() == 1 ? inputList.get(0) : null;
   }
 
@@ -226,17 +221,17 @@ public class ConfigHandler implements InputConfigMonitor {
       }
     }
   }
-  
+
   @Override
   public List<String> getGlobalConfigJsons() {
     return globalConfigJsons;
   }
-  
+
   private void simulateIfNeeded() throws Exception {
     int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber();
     if (simulatedInputNumber == 0)
       return;
-    
+
     InputConfigImpl simulateInputConfig = new InputConfigImpl();
     List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
     simulateInputConfig.setInput(inputConfigDescriptors);
@@ -248,9 +243,9 @@ public class ConfigHandler implements InputConfigMonitor {
       inputDescriptor.setAddFields(new HashMap<String, String>());
       inputConfigDescriptors.add(inputDescriptor);
     }
-    
+
     loadInputConfigs("Simulation", simulateInputConfig);
-    
+
     simulateMode = true;
   }
 
@@ -266,7 +261,7 @@ public class ConfigHandler implements InputConfigMonitor {
         LOG.error("Output block doesn't have destination element");
         continue;
       }
-      Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
+      Output output = (Output) AliasUtil.getClassInstance(value, AliasUtil.AliasType.OUTPUT);
       if (output == null) {
         LOG.error("Output object could not be found");
         continue;
@@ -277,7 +272,7 @@ public class ConfigHandler implements InputConfigMonitor {
 
       // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
       if (output.isEnabled()) {
-        output.logConfigs(Level.INFO);
+        output.logConfigs();
         outputManager.add(output);
       } else {
         LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
@@ -296,7 +291,7 @@ public class ConfigHandler implements InputConfigMonitor {
         LOG.error("Input block doesn't have source element");
         continue;
       }
-      Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT);
+      Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
       if (input == null) {
         LOG.error("Input object could not be found");
         continue;
@@ -308,7 +303,7 @@ public class ConfigHandler implements InputConfigMonitor {
         input.setOutputManager(outputManager);
         input.setInputManager(inputManager);
         inputManager.add(serviceName, input);
-        input.logConfigs(Level.INFO);
+        input.logConfigs();
       } else {
         LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
       }
@@ -338,7 +333,7 @@ public class ConfigHandler implements InputConfigMonitor {
           LOG.error("Filter block doesn't have filter element");
           continue;
         }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
         if (filter == null) {
           LOG.error("Filter object could not be found");
           continue;
@@ -348,9 +343,9 @@ public class ConfigHandler implements InputConfigMonitor {
 
         filter.setOutputManager(outputManager);
         input.addFilter(filter);
-        filter.logConfigs(Level.INFO);
+        filter.logConfigs();
       }
-      
+
       if (input.getFirstFilter() == null) {
         toRemoveInputList.add(input);
       }
@@ -384,7 +379,7 @@ public class ConfigHandler implements InputConfigMonitor {
         }
       }
     }
-    
+
     // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
     for (Output output : InputSimulate.getSimulateOutputs()) {
       output.setLogSearchConfig(logSearchConfig);
@@ -435,7 +430,7 @@ public class ConfigHandler implements InputConfigMonitor {
     inputManager.logStats();
     outputManager.logStats();
   }
-  
+
   public void addMetrics(List<MetricData> metricsList) {
     inputManager.addMetricsContainers(metricsList);
     outputManager.addMetricsContainers(metricsList);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
deleted file mode 100644
index 30bd9fd..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.common;
-
-import java.util.List;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Priority;
-
-public abstract class ConfigItem {
-
-  protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
-  private boolean drain = false;
-  private LogFeederProps logFeederProps;
-  public MetricData statMetric = new MetricData(getStatMetricName(), false);
-
-  public ConfigItem() {
-    super();
-  }
-
-  protected String getStatMetricName() {
-    return null;
-  }
-
-  /**
-   * Used while logging. Keep it short and meaningful
-   */
-  public abstract String getShortDescription();
-
-  /**
-   * Every implementor need to give name to the thread they create
-   */
-  public String getNameForThread() {
-    return this.getClass().getSimpleName();
-  }
-
-  public void addMetricsContainers(List<MetricData> metricsList) {
-    metricsList.add(statMetric);
-  }
-
-  /**
-   * This method needs to be overwritten by deriving classes.
-   */
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    this.logFeederProps = logFeederProps;
-  }
-
-  public abstract boolean isEnabled();
-
-  public void incrementStat(int count) {
-    statMetric.value += count;
-  }
-
-  public void logStatForMetric(MetricData metric, String prefixStr) {
-    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
-  }
-
-  public synchronized void logStat() {
-    logStatForMetric(statMetric, "Stat");
-  }
-
-  public boolean logConfigs(Priority level) {
-    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
-      return false;
-    }
-    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
-      return false;
-    }
-    return true;
-  }
-
-  public boolean isDrain() {
-    return drain;
-  }
-
-  public void setDrain(boolean drain) {
-    this.drain = drain;
-  }
-
-  public LogFeederProps getLogFeederProps() {
-    return logFeederProps;
-  }
-}
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index 1a701e1..81f7317 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -26,10 +26,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
 
@@ -78,21 +80,54 @@ public class LogEntryParseTester {
     Input input = configHandler.getTestInput(inputConfig, logId);
     final Map<String, Object> result = new HashMap<>();
     input.getFirstFilter().init(new LogFeederProps());
-    input.addOutput(new Output() {
+    input.addOutput(new Output<LogFeederProps, InputFileMarker>() {
       @Override
-      public void write(String block, InputMarker inputMarker) throws Exception {
+      public void init(LogFeederProps logFeederProperties) throws Exception {
       }
-      
+
+      @Override
+      public String getShortDescription() {
+        return null;
+      }
+
+      @Override
+      public String getStatMetricName() {
+        return null;
+      }
+
+      @Override
+      public void write(String block, InputFileMarker inputMarker) throws Exception {
+      }
+
+      @Override
+      public Long getPendingCount() {
+        return null;
+      }
+
+      @Override
+      public String getWriteBytesMetricName() {
+        return null;
+      }
+
+      @Override
+      public String getOutputType() {
+        return null;
+      }
+
+      @Override
+      public void outputConfigChanged(OutputProperties outputProperties) {
+      }
+
       @Override
       public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
       }
       
       @Override
-      public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+      public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) {
         result.putAll(jsonObj);
       }
     });
-    input.outputLine(logEntry, new InputMarker(input, null, 0));
+    input.outputLine(logEntry, new InputFileMarker(input, null, 0));
     
     return result.isEmpty() ?
         ImmutableMap.of("errorMessage", (Object)"Could not parse test log entry") :
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index cfb6c78..cfb199c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,14 +19,16 @@
 package org.apache.ambari.logfeeder.conf;
 
 import com.google.common.collect.Maps;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
-import org.apache.ambari.logfeeder.input.InputManager;
+import org.apache.ambari.logfeeder.input.InputManagerImpl;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
+import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.metrics.StatsLogger;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.output.OutputManagerImpl;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
@@ -95,13 +97,14 @@ public class ApplicationConfig {
     return new StatsLogger();
   }
 
+
   @Bean
   public InputManager inputManager() {
-    return new InputManager();
+    return new InputManagerImpl();
   }
 
   @Bean
   public OutputManager outputManager() {
-    return new OutputManager();
+    return new OutputManagerImpl();
   }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
index 353bdc1..92d2ebe 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
@@ -64,7 +64,7 @@ public class LogEntryCacheConfig {
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
   @Value("${" + LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "}")
-  private Boolean cacheLastDedupEnabled;
+  private boolean cacheLastDedupEnabled;
 
   @LogSearchPropertyDescription(
     name = LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY,
@@ -101,7 +101,7 @@ public class LogEntryCacheConfig {
   }
 
   public boolean isCacheLastDedupEnabled() {
-    return cacheLastDedupEnabled;
+    return this.cacheLastDedupEnabled;
   }
 
   public void setCacheLastDedupEnabled(boolean cacheLastDedupEnabled) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 367d1cd..d451496 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
@@ -36,7 +37,7 @@ import java.util.Properties;
 import java.util.stream.Stream;
 
 @Configuration
-public class LogFeederProps {
+public class LogFeederProps implements LogFeederProperties {
 
   @Inject
   private Environment env;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
index 8a45753..c1c7755 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
@@ -26,11 +26,13 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
 import java.nio.charset.Charset;
 
+@Configuration
 public class LogFeederSecurityConfig {
 
   private static final Logger LOG = LoggerFactory.getLogger(LogFeederSecurityConfig.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index f0ef31b..21b1ea4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -19,6 +19,20 @@
 
 package org.apache.ambari.logfeeder.filter;
 
+import com.google.gson.reflect.TypeToken;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.reflect.Type;
@@ -31,23 +45,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.exception.GrokException;
-
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.google.gson.reflect.TypeToken;
-
-public class FilterGrok extends Filter {
+public class FilterGrok extends Filter<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(FilterGrok.class);
 
   private static final String GROK_PATTERN_FILE = "grok-patterns";
@@ -78,10 +76,10 @@ public class FilterGrok extends Filter {
     super.init(logFeederProps);
 
     try {
-      messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
-      multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
-      sourceField = filterDescriptor.getSourceField();
-      removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
+      messagePattern = escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMessagePattern());
+      multilinePattern = escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMultilinePattern());
+      sourceField = getFilterDescriptor().getSourceField();
+      removeSourceField = BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(), removeSourceField);
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
       getShortDescription());
@@ -161,7 +159,7 @@ public class FilterGrok extends Filter {
   }
 
   @Override
-  public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception {
     if (grokMessage == null) {
       return;
     }
@@ -196,7 +194,7 @@ public class FilterGrok extends Filter {
   }
 
   @Override
-  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
     if (sourceField != null) {
       savedInputMarker = inputMarker;
       applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
@@ -206,7 +204,7 @@ public class FilterGrok extends Filter {
     }
   }
 
-  private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogFeederException {
+  private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws Exception {
     String jsonStr = grokMessage.capture(inputStr);
 
     boolean parseError = false;
@@ -251,7 +249,7 @@ public class FilterGrok extends Filter {
     String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
     int inputStrLength = inputStr != null ? inputStr.length() : 0;
     LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStrLength + ", input=" +
-        input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
+        getInput().getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
         Level.WARN);
   }
 
@@ -261,7 +259,7 @@ public class FilterGrok extends Filter {
       Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
       try {
         applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
-      } catch (LogFeederException e) {
+      } catch (Exception e) {
         LOG.error(e.getLocalizedMessage(), e.getCause());
       }
       strBuff = null;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 1a2da0c..207d6f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -18,18 +18,24 @@
  */
 package org.apache.ambari.logfeeder.filter;
 
-import java.util.Map;
-
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FilterJSON extends Filter<LogFeederProps> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FilterJSON.class);
 
-public class FilterJSON extends Filter {
-  
   @Override
-  public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception {
     Map<String, Object> jsonMap = null;
     try {
       jsonMap = LogFeederUtil.toJSONObject(inputStr);
@@ -50,4 +56,10 @@ public class FilterJSON extends Filter {
     }
     super.apply(jsonMap, inputMarker);
   }
+
+  @Override
+  public String getShortDescription() {
+    return "filter:filter=json,input=" + getInput().getShortDescription();
+  }
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index adcf0a4..695c7e3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -19,21 +19,25 @@
 
 package org.apache.ambari.logfeeder.filter;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.ambari.logfeeder.common.LogFeederException;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class FilterKeyValue extends Filter<LogFeederProps> {
+
+  private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
 
-public class FilterKeyValue extends Filter {
   private String sourceField = null;
   private String valueSplit = "=";
   private String fieldSplit = "\t";
@@ -45,26 +49,26 @@ public class FilterKeyValue extends Filter {
   public void init(LogFeederProps logFeederProps) throws Exception {
     super.init(logFeederProps);
 
-    sourceField = filterDescriptor.getSourceField();
-    valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
-    fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit);
-    valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders();
+    sourceField = getFilterDescriptor().getSourceField();
+    valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)getFilterDescriptor()).getValueSplit(), valueSplit);
+    fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)getFilterDescriptor()).getFieldSplit(), fieldSplit);
+    valueBorders = ((FilterKeyValueDescriptor)getFilterDescriptor()).getValueBorders();
 
     LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
         fieldSplit + ", " + getShortDescription());
     if (StringUtils.isEmpty(sourceField)) {
-      LOG.fatal("source_field is not set for filter. This filter will not be applied");
+      LOG.fatal("source_field is not set for filter. Thiss filter will not be applied");
       return;
     }
   }
 
   @Override
-  public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception {
     apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
   }
 
   @Override
-  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
     if (sourceField == null) {
       return;
     }
@@ -136,7 +140,7 @@ public class FilterKeyValue extends Filter {
     errorMetric.value++;
     String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
     LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStr.length() + ", input=" +
-        input.getShortDescription() + ". First upto 200 characters=" + StringUtils.abbreviate(inputStr, 200), null, LOG,
+        getInput().getShortDescription() + ". First upto 200 characters=" + StringUtils.abbreviate(inputStr, 200), null, LOG,
         Level.ERROR);
   }
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
deleted file mode 100644
index cf295c5..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.input;
-
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
-import org.apache.commons.lang.ObjectUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-
-public abstract class AbstractInputFile extends Input {
-  private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
-
-  protected File[] logFiles;
-  protected String logPath;
-  protected Object fileKey;
-  protected String base64FileKey;
-
-  protected boolean isReady;
-
-  private String checkPointExtension;
-  private int checkPointIntervalMS;
-  
-  private Map<String, File> checkPointFiles = new HashMap<>();
-  private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
-  private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
-  private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>();
-
-  private LogFeederProps logFeederProps;
-
-  @Override
-  protected String getStatMetricName() {
-    return "input.files.read_lines";
-  }
-  
-  @Override
-  protected String getReadBytesMetricName() {
-    return "input.files.read_bytes";
-  }
-  
-  @Override
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    this.logFeederProps = logFeederProps;
-    LOG.info("init() called");
-    
-    checkPointExtension = logFeederProps.getCheckPointExtension();
-
-    // Let's close the file and set it to true after we start monitoring it
-    setClosed(true);
-    logPath = inputDescriptor.getPath();
-    checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
-
-    if (StringUtils.isEmpty(logPath)) {
-      LOG.error("path is empty for file input. " + getShortDescription());
-      return;
-    }
-
-    setFilePath(logPath);
-    boolean isFileReady = isReady();
-
-    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
-
-    super.init(logFeederProps);
-  }
-
-  protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException {
-    LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
-    BufferedReader br = null;
-
-    int lineCount = 0;
-    try {
-      setFilePath(logPathFile.getAbsolutePath());
-      
-      br = openLogFile(logPathFile);
-
-      boolean resume = true;
-      int resumeFromLineNumber = getResumeFromLineNumber();
-      if (resumeFromLineNumber > 0) {
-        LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber);
-        resume = false;
-      }
-      
-      setClosed(false);
-      int sleepStep = 2;
-      int sleepIteration = 0;
-      while (true) {
-        try {
-          if (isDrain()) {
-            break;
-          }
-
-          String line = br.readLine();
-          if (line == null) {
-            if (!resume) {
-              resume = true;
-            }
-            sleepIteration++;
-            if (sleepIteration == 2) {
-              flush();
-              if (!follow) {
-                LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
-                break;
-              }
-            } else if (sleepIteration > 4) {
-              Object newFileKey = getFileKey(logPathFile);
-              if (newFileKey != null && (fileKey == null || !newFileKey.equals(fileKey))) {
-                LOG.info("File key is different. Marking this input file for rollover. oldKey=" + fileKey + ", newKey=" +
-                    newFileKey + ". " + getShortDescription());
-                
-                try {
-                  LOG.info("File is rolled over. Closing current open file." + getShortDescription() + ", lineCount=" +
-                      lineCount);
-                  br.close();
-                } catch (Exception ex) {
-                  LOG.error("Error closing file" + getShortDescription(), ex);
-                  break;
-                }
-                
-                try {
-                  LOG.info("Opening new rolled over file." + getShortDescription());
-                  br = openLogFile(logPathFile);
-                  lineCount = 0;
-                } catch (Exception ex) {
-                  LOG.error("Error opening rolled over file. " + getShortDescription(), ex);
-                  LOG.info("Added input to not ready list." + getShortDescription());
-                  isReady = false;
-                  inputManager.addToNotReady(this);
-                  break;
-                }
-                LOG.info("File is successfully rolled over. " + getShortDescription());
-                continue;
-              }
-            }
-            try {
-              Thread.sleep(sleepStep * 1000);
-              sleepStep = Math.min(sleepStep * 2, 10);
-            } catch (InterruptedException e) {
-              LOG.info("Thread interrupted." + getShortDescription());
-            }
-          } else {
-            lineCount++;
-            sleepStep = 1;
-            sleepIteration = 0;
-
-            if (!resume && lineCount > resumeFromLineNumber) {
-              LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + getShortDescription());
-              resume = true;
-            }
-            if (resume) {
-              InputMarker marker = new InputMarker(this, base64FileKey, lineCount);
-              outputLine(line, marker);
-            }
-          }
-        } catch (Throwable t) {
-          String logMessageKey = this.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
-          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
-              ", input=" + getShortDescription(), t, LOG, Level.ERROR);
-        }
-      }
-    } finally {
-      if (br != null) {
-        LOG.info("Closing reader." + getShortDescription() + ", lineCount=" + lineCount);
-        try {
-          br.close();
-        } catch (Throwable t) {
-          // ignore
-        }
-      }
-    }
-  }
-
-  protected abstract BufferedReader openLogFile(File logFile) throws IOException;
-
-  protected abstract Object getFileKey(File logFile);
-  
-  private int getResumeFromLineNumber() {
-    int resumeFromLineNumber = 0;
-    
-    File checkPointFile = null;
-    try {
-      LOG.info("Checking existing checkpoint file. " + getShortDescription());
-
-      String checkPointFileName = base64FileKey + checkPointExtension;
-      File checkPointFolder = inputManager.getCheckPointFolderFile();
-      checkPointFile = new File(checkPointFolder, checkPointFileName);
-      checkPointFiles.put(base64FileKey, checkPointFile);
-      Map<String, Object> jsonCheckPoint = null;
-      if (!checkPointFile.exists()) {
-        LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
-      } else {
-        try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
-          int contentSize = checkPointWriter.readInt();
-          byte b[] = new byte[contentSize];
-          int readSize = checkPointWriter.read(b, 0, contentSize);
-          if (readSize != contentSize) {
-            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
-                readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
-          } else {
-            String jsonCheckPointStr = new String(b, 0, readSize);
-            jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
-            resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
-
-            LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
-                ", resumeFromLineNumber=" + resumeFromLineNumber);
-         }
-        } catch (EOFException eofEx) {
-          LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
-              getShortDescription(), eofEx);
-        }
-      }
-      if (jsonCheckPoint == null) {
-        // This seems to be first time, so creating the initial checkPoint object
-        jsonCheckPoint = new HashMap<String, Object>();
-        jsonCheckPoint.put("file_path", filePath);
-        jsonCheckPoint.put("file_key", base64FileKey);
-      }
-      
-      jsonCheckPoints.put(base64FileKey, jsonCheckPoint);
-
-    } catch (Throwable t) {
-      LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
-    }
-    
-    return resumeFromLineNumber;
-  }
-
-  @Override
-  public synchronized void checkIn(InputMarker inputMarker) {
-    try {
-      Map<String, Object> jsonCheckPoint = jsonCheckPoints.get(inputMarker.base64FileKey);
-      File checkPointFile = checkPointFiles.get(inputMarker.base64FileKey);
-      
-      int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
-      if (lineNumber > inputMarker.lineNumber) {
-        // Already wrote higher line number for this input
-        return;
-      }
-      // If interval is greater than last checkPoint time, then write
-      long currMS = System.currentTimeMillis();
-      long lastCheckPointTimeMs = lastCheckPointTimeMSs.containsKey(inputMarker.base64FileKey) ?
-          lastCheckPointTimeMSs.get(inputMarker.base64FileKey) : 0;
-      if (!isClosed() && (currMS - lastCheckPointTimeMs < checkPointIntervalMS)) {
-        // Let's save this one so we can update the check point file on flush
-        lastCheckPointInputMarkers.put(inputMarker.base64FileKey, inputMarker);
-        return;
-      }
-      lastCheckPointTimeMSs.put(inputMarker.base64FileKey, currMS);
-
-      jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
-      jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
-      jsonCheckPoint.put("last_write_time_date", new Date());
-
-      String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
-      File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp");
-      if (tmpCheckPointFile.exists()) {
-        tmpCheckPointFile.delete();
-      }
-      RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws");
-      tmpRaf.writeInt(jsonStr.length());
-      tmpRaf.write(jsonStr.getBytes());
-      tmpRaf.getFD().sync();
-      tmpRaf.close();
-      
-      FileUtil.move(tmpCheckPointFile, checkPointFile);
-
-      if (isClosed()) {
-        String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
-        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
-            ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
-      }
-    } catch (Throwable t) {
-      String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
-          LOG, Level.ERROR);
-    }
-  }
-
-  @Override
-  public void lastCheckIn() {
-    for (InputMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
-      checkIn(lastCheckPointInputMarker);
-    }
-  }
-
-  @Override
-  public void close() {
-    super.close();
-    LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
-    lastCheckIn();
-    isClosed = true;
-  }
-
-  @Override
-  public String getShortDescription() {
-    return "input:source=" + inputDescriptor.getSource() + ", path=" +
-        (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
-  }
-}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index e8066be..0c551cd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -16,31 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.input;
 
-import java.io.File;
-import java.io.FilenameFilter;
-import java.nio.charset.Charset;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.io.Files;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
 import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-
-import com.google.common.io.Files;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class InputConfigUploader extends Thread {
-
   protected static final Logger LOG = LoggerFactory.getLogger(InputConfigUploader.class);
 
   private static final long SLEEP_BETWEEN_CHECK = 2000;
@@ -73,7 +69,7 @@ public class InputConfigUploader extends Thread {
     this.start();
     config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, logFeederProps.getClusterName());
   }
-  
+
   @Override
   public void run() {
     while (true) {
@@ -98,7 +94,7 @@ public class InputConfigUploader extends Thread {
       } else {
         LOG.warn("Cannot find input config files in config dir ({})", logFeederProps.getConfDir());
       }
-      
+
       try {
         Thread.sleep(SLEEP_BETWEEN_CHECK);
       } catch (InterruptedException e) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index c35c831..8b5310f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -18,22 +18,61 @@
  */
 package org.apache.ambari.logfeeder.input;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.util.Arrays;
-import java.util.Comparator;
-
+import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
+import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
+import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InputFile extends Input<LogFeederProps, InputFileMarker> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InputFile.class);
 
-public class InputFile extends AbstractInputFile {
+  private static final boolean DEFAULT_TAIL = true;
+  private static final boolean DEFAULT_USE_EVENT_MD5 = false;
+  private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+  private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
+
+  private boolean isReady;
+
+  private boolean tail;
+
+  private String filePath;
+  private File[] logFiles;
+  private String logPath;
+  private Object fileKey;
+  private String base64FileKey;
+  private String checkPointExtension;
+  private int checkPointIntervalMS;
+
+  private Map<String, File> checkPointFiles = new HashMap<>();
+  private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
+  private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
+  private Map<String, InputFileMarker> lastCheckPointInputMarkers = new HashMap<>();
+
+  private Thread thread;
 
   @Override
   public boolean isReady() {
@@ -43,7 +82,7 @@ public class InputFile extends AbstractInputFile {
       if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
         if (tail && logFiles.length > 1) {
           LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
-              ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+            ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
         }
         LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
         isReady = true;
@@ -54,6 +93,45 @@ public class InputFile extends AbstractInputFile {
     return isReady;
   }
 
+  @Override
+  public void setReady(boolean isReady) {
+    this.isReady = isReady;
+  }
+
+  @Override
+  public String getNameForThread() {
+    if (filePath != null) {
+      try {
+        return (getType() + "=" + (new File(filePath)).getName());
+      } catch (Throwable ex) {
+        LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
+      }
+    }
+    return super.getNameForThread() + ":" + getType();
+  }
+
+  @Override
+  public synchronized void checkIn(InputFileMarker inputMarker) {
+    FileCheckInHelper.checkIn(this, inputMarker);
+  }
+
+  @Override
+  public void lastCheckIn() {
+    for (InputFileMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
+      checkIn(lastCheckPointInputMarker);
+    }
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return "input.files.read_lines";
+  }
+
+  @Override
+  public String getReadBytesMetricName() {
+    return "input.files.read_bytes";
+  }
+
   private File[] getActualFiles(String searchPath) {
     File searchFile = new File(searchPath);
     if (!searchFile.getParentFile().exists()) {
@@ -63,20 +141,71 @@ public class InputFile extends AbstractInputFile {
     } else {
       FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
       File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
-      Arrays.sort(logFiles,
-          new Comparator<File>() {
-            @Override
-            public int compare(File o1, File o2) {
-              return o1.getName().compareTo(o2.getName());
-            }
-      });
+      Arrays.sort(logFiles, Comparator.comparing(File::getName));
       return logFiles;
     }
   }
 
   @Override
-  void start() throws Exception {
-    boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
+  public boolean monitor() {
+    if (isReady()) {
+      LOG.info("Starting thread. " + getShortDescription());
+      thread = new Thread(this, getNameForThread());
+      thread.start();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public List<InputFile> getChildInputs() {
+    return null;
+  }
+
+  @Override
+  public InputFileMarker getInputMarker() {
+    return null;
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProps) throws Exception {
+    super.init(logFeederProps);
+    LOG.info("init() called");
+
+    checkPointExtension = logFeederProps.getCheckPointExtension();
+
+    // Let's close the file and set it to true after we start monitoring it
+    setClosed(true);
+    logPath = getInputDescriptor().getPath();
+    checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+    if (StringUtils.isEmpty(logPath)) {
+      LOG.error("path is empty for file input. " + getShortDescription());
+      return;
+    }
+
+    setFilePath(logPath);
+    boolean isFileReady = isReady();
+    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isReady());
+
+    LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
+    initCache(
+      cacheConfig.isCacheEnabled(),
+      cacheConfig.getCacheKeyField(),
+      cacheConfig.getCacheSize(),
+      cacheConfig.isCacheLastDedupEnabled(),
+      cacheConfig.getCacheDedupInterval(),
+      getFilePath());
+
+    tail = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isTail(), DEFAULT_TAIL);
+    setUseEventMD5(BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5));
+    setGenEventMD5(BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isGenEventMd5(), DEFAULT_GEN_EVENT_MD5));
+  }
+
+  @Override
+  public void start() throws Exception {
+    boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getProcessFile(), true);
     if (isProcessFile) {
       for (int i = logFiles.length - 1; i >= 0; i--) {
         File file = logFiles[i];
@@ -98,27 +227,33 @@ public class InputFile extends AbstractInputFile {
     }
   }
 
-  @Override
-  protected BufferedReader openLogFile(File logFile) throws FileNotFoundException {
+  public int getResumeFromLineNumber() {
+    return ResumeLineNumberHelper.getResumeFromLineNumber(this);
+  }
+
+  public void processFile(File logPathFile, boolean follow) throws Exception {
+    ProcessFileHelper.processFile(this, logPathFile, follow);
+  }
+
+  public BufferedReader openLogFile(File logFile) throws Exception {
     BufferedReader br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logFile));
-    fileKey = getFileKey(logFile);
+    fileKey = getFileKeyFromLogFile(logFile);
     base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
     LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
     return br;
   }
 
-  @Override
-  protected Object getFileKey(File logFile) {
+  public Object getFileKeyFromLogFile(File logFile) {
     return FileUtil.getFileKey(logFile);
   }
 
   private void copyFiles(File[] files) {
-    boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false);
+    boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getCopyFile(), false);
     if (isCopyFile && files != null) {
       for (File file : files) {
         try {
-          InputMarker marker = new InputMarker(this, null, 0);
-          outputManager.copyFile(file, marker);
+          InputFileMarker marker = new InputFileMarker(this, null, 0);
+          getOutputManager().copyFile(file, marker);
           if (isClosed() || isDrain()) {
             LOG.info("isClosed or isDrain. Now breaking loop.");
             break;
@@ -129,4 +264,94 @@ public class InputFile extends AbstractInputFile {
       }
     }
   }
+
+  @Override
+  public boolean isEnabled() {
+    return BooleanUtils.isNotFalse(getInputDescriptor().isEnabled());
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "input:source=" + getInputDescriptor().getSource() + ", path=" +
+      (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
+  }
+
+  @Override
+  public boolean logConfigs() {
+    LOG.info("Printing Input=" + getShortDescription());
+    LOG.info("description=" + getInputDescriptor().getPath());
+    return true;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
+    lastCheckIn();
+    setClosed(true);
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public String getLogPath() {
+    return logPath;
+  }
+
+  public Object getFileKey() {
+    return fileKey;
+  }
+
+  public String getBase64FileKey() throws Exception {
+    return base64FileKey;
+  }
+
+  public void setFileKey(Object fileKey) {
+    this.fileKey = fileKey;
+  }
+
+  public boolean isTail() {
+    return tail;
+  }
+
+  public File[] getLogFiles() {
+    return logFiles;
+  }
+
+  public void setBase64FileKey(String base64FileKey) {
+    this.base64FileKey = base64FileKey;
+  }
+
+  public void setLogFiles(File[] logFiles) {
+    this.logFiles = logFiles;
+  }
+
+  public String getCheckPointExtension() {
+    return checkPointExtension;
+  }
+
+  public int getCheckPointIntervalMS() {
+    return checkPointIntervalMS;
+  }
+
+  public Map<String, File> getCheckPointFiles() {
+    return checkPointFiles;
+  }
+
+  public Map<String, Long> getLastCheckPointTimeMSs() {
+    return lastCheckPointTimeMSs;
+  }
+
+  public Map<String, Map<String, Object>> getJsonCheckPoints() {
+    return jsonCheckPoints;
+  }
+
+  public Map<String, InputFileMarker> getLastCheckPointInputMarkers() {
+    return lastCheckPointInputMarkers;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
similarity index 53%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
index 6767687..d3d12bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
@@ -16,25 +16,45 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.input;
 
-/**
- * This file contains the file inode, line number of the log currently been read
- */
-public class InputMarker {
-  public final Input input;
-  public final String base64FileKey;
-  public final int lineNumber;
-  
-  public InputMarker(Input input, String base64FileKey, int lineNumber) {
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InputFileMarker implements InputMarker {
+
+  private final Input input;
+  private final String base64FileKey;
+  private final Integer lineNumber;
+
+  private final Map<String, Object> properties = new HashMap<>();
+
+  public InputFileMarker(Input input, String base64FileKey, Integer lineNumber) {
     this.input = input;
     this.base64FileKey = base64FileKey;
     this.lineNumber = lineNumber;
+    properties.put("line_number", lineNumber);
+    properties.put("base64_file_key", base64FileKey);
   }
-  
+
   @Override
-  public String toString() {
-    return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
+  public Input getInput() {
+    return this.input;
+  }
+
+  @Override
+  public Map<String, Object> getAllProperties() {
+    return properties;
+  }
+
+  public String getBase64FileKey() {
+    return base64FileKey;
+  }
+
+  public int getLineNumber() {
+    return lineNumber;
   }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
similarity index 94%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index d1f38ed..70aa681 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -16,9 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.input;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+import javax.inject.Inject;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
@@ -33,23 +45,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-import javax.inject.Inject;
+public class InputManagerImpl extends InputManager {
 
-public class InputManager {
-  private static final Logger LOG = Logger.getLogger(InputManager.class);
+  private static final Logger LOG = Logger.getLogger(InputManagerImpl.class);
 
   private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
-  
+
   private Map<String, List<Input>> inputs = new HashMap<>();
   private Set<Input> notReadyList = new HashSet<>();
 
@@ -59,7 +60,7 @@ public class InputManager {
   private File checkPointFolderFile;
 
   private MetricData filesCountMetric = new MetricData("input.files.count", true);
-  
+
   private Thread inputIsReadyMonitor;
 
   @Inject
@@ -69,6 +70,7 @@ public class InputManager {
     return inputs.get(serviceName);
   }
 
+  @Override
   public void add(String serviceName, Input input) {
     List<Input> inputList = inputs.get(serviceName);
     if (inputList == null) {
@@ -78,6 +80,7 @@ public class InputManager {
     inputList.add(input);
   }
 
+  @Override
   public void removeInputsForService(String serviceName) {
     List<Input> inputList = inputs.get(serviceName);
     for (Input input : inputList) {
@@ -92,6 +95,7 @@ public class InputManager {
     inputs.remove(serviceName);
   }
 
+  @Override
   public void removeInput(Input input) {
     LOG.info("Trying to remove from inputList. " + input.getShortDescription());
     for (List<Input> inputList : inputs.values()) {
@@ -118,11 +122,12 @@ public class InputManager {
     return count;
   }
 
-  public void init() {
+  @Override
+  public void init() throws Exception {
     initCheckPointSettings();
     startMonitorThread();
   }
-  
+
   private void initCheckPointSettings() {
     checkPointExtension = logFeederProps.getCheckPointExtension();
     LOG.info("Determining valid checkpoint folder");
@@ -133,7 +138,7 @@ public class InputManager {
       checkPointFolderFile = new File(checkPointFolder);
       isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
     }
-    
+
     if (!isCheckPointFolderValid) {
       // Let's use tmp folder
       checkPointFolderFile = new File(logFeederProps.getTmpDir(), CHECKPOINT_SUBFOLDER_NAME);
@@ -141,10 +146,10 @@ public class InputManager {
       isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
       if (isCheckPointFolderValid) {
         LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." +
-            "Please set logfeeder.checkpoint.folder property");
+          "Please set logfeeder.checkpoint.folder property");
       }
     }
-    
+
     if (isCheckPointFolderValid) {
       LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
     } else {
@@ -182,10 +187,10 @@ public class InputManager {
         }
       }
     };
-    
+
     inputIsReadyMonitor.start();
   }
-  
+
   public void startInputs(String serviceName) {
     for (Input input : inputs.get(serviceName)) {
       try {
@@ -194,7 +199,7 @@ public class InputManager {
           input.monitor();
         } else {
           LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
-              "So it might not be an issue. " + input.getShortDescription());
+            "So it might not be an issue. " + input.getShortDescription());
           notReadyList.add(input);
         }
       } catch (Exception e) {
@@ -231,10 +236,12 @@ public class InputManager {
     return checkPointFolderFile;
   }
 
-  void addToNotReady(Input notReadyInput) {
+  @Override
+  public void addToNotReady(Input notReadyInput) {
     notReadyList.add(notReadyInput);
   }
 
+  @Override
   public void addMetricsContainers(List<MetricData> metricsList) {
     for (List<Input> inputList : inputs.values()) {
       for (Input input : inputList) {
@@ -253,12 +260,11 @@ public class InputManager {
     }
 
     filesCountMetric.value = getActiveFilesCount();
-    LogFeederUtil.logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
+    // TODO: logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
   }
 
 
   public void cleanCheckPointFiles() {
-
     if (checkPointFolderFile == null) {
       LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + checkPointFolderFile);
       return;
@@ -276,7 +282,7 @@ public class InputManager {
         }
       }
       LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
-          checkPointFolderFile.getAbsolutePath());
+        checkPointFolderFile.getAbsolutePath());
 
     } catch (Throwable t) {
       LOG.error("Error while cleaning checkPointFiles", t);
@@ -306,12 +312,12 @@ public class InputManager {
             String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
             if (!logFileKey.equals(fileBase64)) {
               LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
-                  logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+                logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
               deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
             }
           } else {
             LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
-                checkPointFile.getAbsolutePath());
+              checkPointFile.getAbsolutePath());
             deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
           }
           if (deleteCheckPointFile) {
@@ -326,10 +332,10 @@ public class InputManager {
     } catch (Throwable t) {
       LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
     }
-    
+
     return deleted;
   }
-  
+
   private boolean wasFileRenamed(File folder, String searchFileBase64) {
     for (File file : folder.listFiles()) {
       Object fileKeyObj = FileUtil.getFileKey(file);
@@ -343,7 +349,7 @@ public class InputManager {
     }
     return false;
   }
-  
+
   public void waitOnAllInputs() {
     //wait on inputs
     for (List<Input> inputList : inputs.values()) {
@@ -414,7 +420,7 @@ public class InputManager {
         return;
       }
     }
-    
+
     LOG.warn("Some inputs were not closed after " + iterations + " iterations");
     for (List<Input> inputList : inputs.values()) {
       for (Input input : inputList) {
@@ -433,4 +439,6 @@ public class InputManager {
   public LogFeederProps getLogFeederProps() {
     return logFeederProps;
   }
+
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 69d053a..41db8bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -18,34 +18,37 @@
  */
 package org.apache.ambari.logfeeder.input;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class InputS3File extends AbstractInputFile {
+import java.io.BufferedReader;
+import java.io.File;
+
+public class InputS3File extends InputFile {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InputS3File.class);
 
   @Override
   public boolean isReady() {
-    if (!isReady) {
+    if (!isReady()) {
       // Let's try to check whether the file is available
-      logFiles = getActualFiles(logPath);
-      if (!ArrayUtils.isEmpty(logFiles)) {
-        if (tail && logFiles.length > 1) {
-          LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
-              ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
+      setLogFiles(getActualFiles(getLogPath()));
+      if (!ArrayUtils.isEmpty(getLogFiles())) {
+        if (isTail() && getLogFiles().length > 1) {
+          LOG.warn("Found multiple files (" + getLogFiles().length + ") for the file filter " + getFilePath() +
+              ". Will use only the first one. Using " + getLogFiles()[0].getAbsolutePath());
         }
-        LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
-        isReady = true;
+        LOG.info("File filter " + getFilePath() + " expanded to " + getLogFiles()[0].getAbsolutePath());
+        setReady(true);
       } else {
-        LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+        LOG.debug(getLogPath() + " file doesn't exist. Ignoring for now");
       }
     }
-    return isReady;
+    return isReady();
   }
 
   private File[] getActualFiles(String searchPath) {
@@ -54,14 +57,13 @@ public class InputS3File extends AbstractInputFile {
   }
 
   @Override
-  void start() throws Exception {
-    if (ArrayUtils.isEmpty(logFiles)) {
+  public void start() throws Exception {
+    if (ArrayUtils.isEmpty(getLogFiles())) {
       return;
     }
-
-    for (int i = logFiles.length - 1; i >= 0; i--) {
-      File file = logFiles[i];
-      if (i == 0 || !tail) {
+    for (int i = getLogFiles().length - 1; i >= 0; i--) {
+      File file = getLogFiles()[i];
+      if (i == 0 || !isTail()) {
         try {
           processFile(file, i == 0);
           if (isClosed() || isDrain()) {
@@ -77,24 +79,25 @@ public class InputS3File extends AbstractInputFile {
   }
 
   @Override
-  protected BufferedReader openLogFile(File logPathFile) throws IOException {
-    String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey();
-    String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey();
+  public BufferedReader openLogFile(File logPathFile) throws Exception {
+    String s3AccessKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey();
+    String s3SecretKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey();
     BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
-    fileKey = getFileKey(logPathFile);
-    base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+    Object fileKey = getFileKey(logPathFile);
+    setFileKey(fileKey);
+    String base64FileKey = Base64.byteArrayToBase64(getFileKey().toString().getBytes());
+    setBase64FileKey(base64FileKey);
     LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
     return br;
   }
 
-  @Override
-  protected Object getFileKey(File logFile) {
+  private Object getFileKey(File logFile) {
     return logFile.getPath();
   }
   
   @Override
   public void close() {
     super.close();
-    isClosed = true;
+    setClosed(true);
   }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 2b2d145..3ea3d90 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,21 @@
  */
 package org.apache.ambari.logfeeder.input;
 
+import com.google.common.base.Joiner;
+import org.apache.ambari.logfeeder.conf.InputSimulateConfig;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.filter.FilterJSON;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,21 +45,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.ambari.logfeeder.conf.InputSimulateConfig;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.filter.FilterJSON;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.solr.common.util.Base64;
-
-import com.google.common.base.Joiner;
-
-public class InputSimulate extends Input {
+public class InputSimulate extends InputFile {
+  private static final Logger LOG = LoggerFactory.getLogger(InputSimulate.class);
   private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
 
   private static final Map<String, String> typeToFilePath = new HashMap<>();
@@ -55,16 +57,16 @@ public class InputSimulate extends Input {
       inputTypes.add(input.getType());
     }
   }
-  
+
   private static final Map<String, Integer> typeToLineNumber = new HashMap<>();
-  
+
   private static final AtomicInteger hostNumber = new AtomicInteger(0);
-  
+
   private static final List<Output> simulateOutputs = new ArrayList<>();
   public static List<Output> getSimulateOutputs() {
     return simulateOutputs;
   }
-  
+
   private final Random random = new Random(System.currentTimeMillis());
 
   private InputSimulateConfig conf;
@@ -123,20 +125,20 @@ public class InputSimulate extends Input {
   }
 
   @Override
-  void start() throws Exception {
-    getFirstFilter().setOutputManager(outputManager);
+  public void start() throws Exception {
+    getFirstFilter().setOutputManager(getOutputManager());
     while (true) {
       if (types.isEmpty()) {
         try { Thread.sleep(sleepMillis); } catch(Exception e) { /* Ignore */ }
         continue;
       }
       String type = imitateRandomLogFile();
-      
+
       String line = getLine();
-      InputMarker marker = getInputMarker(type);
-      
+      InputFileMarker marker = getInputMarker(type);
+
       outputLine(line, marker);
-      
+
       try { Thread.sleep(sleepMillis); } catch(Exception e) { /* Ignore */ }
     }
   }
@@ -145,16 +147,15 @@ public class InputSimulate extends Input {
     int typePos = random.nextInt(types.size());
     String type = types.get(typePos);
     String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type);
-    
-    ((InputDescriptorImpl)inputDescriptor).setType(type);
+
+    ((InputDescriptorImpl)getInputDescriptor()).setType(type);
     setFilePath(filePath);
-    
+
     return type;
   }
 
-  private InputMarker getInputMarker(String type) throws Exception {
-    InputMarker marker = new InputMarker(this, getBase64FileKey(), getLineNumber(type));
-    return marker;
+  private InputFileMarker getInputMarker(String type) throws Exception {
+    return new InputFileMarker(this, getBase64FileKey(), getLineNumber(type));
   }
 
   private static synchronized int getLineNumber(String type) {
@@ -162,13 +163,13 @@ public class InputSimulate extends Input {
       typeToLineNumber.put(type, 0);
     }
     Integer lineNumber = typeToLineNumber.get(type) + 1;
-    
+
     typeToLineNumber.put(type, lineNumber);
     return lineNumber;
   }
 
-  private String getBase64FileKey() throws Exception {
-    String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + filePath;
+  public String getBase64FileKey() throws Exception {
+    String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + getFilePath();
     return Base64.byteArrayToBase64(fileKey.getBytes());
   }
 
@@ -177,7 +178,7 @@ public class InputSimulate extends Input {
     String logMessage = createLogMessage();
     return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage, host);
   }
-  
+
   private String createLogMessage() {
     int logMessageLength = minLogWords + random.nextInt(maxLogWords - minLogWords + 1);
     Set<Integer> words = new TreeSet<>();
@@ -188,16 +189,16 @@ public class InputSimulate extends Input {
         logMessage.add(String.format("Word%06d", word));
       }
     }
-    
+
     return Joiner.on(' ').join(logMessage);
   }
 
   @Override
-  public void checkIn(InputMarker inputMarker) {}
+  public void checkIn(InputFileMarker inputMarker) {}
 
   @Override
   public void lastCheckIn() {}
-  
+
   @Override
   public String getNameForThread() {
     return "Simulated input";
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
new file mode 100644
index 0000000..9c607cf
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.Map;
+
+public class FileCheckInHelper {
+
+  private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class);
+
+  private FileCheckInHelper() {
+  }
+
+  public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
+    try {
+      Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey());
+      File checkPointFile = inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey());
+
+      int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+      if (lineNumber > inputMarker.getLineNumber()) {
+        // Already wrote higher line number for this input
+        return;
+      }
+      // If interval is greater than last checkPoint time, then write
+      long currMS = System.currentTimeMillis();
+      long lastCheckPointTimeMs = inputFile.getLastCheckPointTimeMSs().containsKey(inputMarker.getBase64FileKey()) ?
+        inputFile.getLastCheckPointTimeMSs().get(inputMarker.getBase64FileKey()) : 0;
+      if (!inputFile.isClosed() && (currMS - lastCheckPointTimeMs < inputFile.getCheckPointIntervalMS())) {
+        // Let's save this one so we can update the check point file on flush
+        inputFile.getLastCheckPointInputMarkers().put(inputMarker.getBase64FileKey(), inputMarker);
+        return;
+      }
+      inputFile.getLastCheckPointTimeMSs().put(inputMarker.getBase64FileKey(), currMS);
+
+      jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.getLineNumber()));
+      jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+      jsonCheckPoint.put("last_write_time_date", new Date());
+
+      String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+      File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp");
+      if (tmpCheckPointFile.exists()) {
+        tmpCheckPointFile.delete();
+      }
+      RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws");
+      tmpRaf.writeInt(jsonStr.length());
+      tmpRaf.write(jsonStr.getBytes());
+      tmpRaf.getFD().sync();
+      tmpRaf.close();
+
+      FileUtil.move(tmpCheckPointFile, checkPointFile);
+
+      if (inputFile.isClosed()) {
+        String logMessageKey = inputFile.getClass().getSimpleName() + "_FINAL_CHECKIN";
+        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + inputFile.getShortDescription() +
+          ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+      }
+    } catch (Throwable t) {
+      String logMessageKey = inputFile.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + inputFile.getShortDescription(), t,
+        LOG, Level.ERROR);
+    }
+  }
+
+
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java
new file mode 100644
index 0000000..4ed415a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.File;
+
+public class ProcessFileHelper {
+
+  private static final Logger LOG = Logger.getLogger(ProcessFileHelper.class);
+
+  private ProcessFileHelper() {
+  }
+
+  public static void processFile(InputFile inputFile, File logPathFile, boolean follow) throws Exception {
+    LOG.info("Monitoring logPath=" + inputFile.getLogPath() + ", logPathFile=" + logPathFile);
+    BufferedReader br = null;
+
+    int lineCount = 0;
+    try {
+      inputFile.setFilePath(logPathFile.getAbsolutePath());
+
+      br = inputFile.openLogFile(logPathFile);
+
+      boolean resume = true;
+      int resumeFromLineNumber = inputFile.getResumeFromLineNumber();
+      if (resumeFromLineNumber > 0) {
+        LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber);
+        resume = false;
+      }
+
+      inputFile.setClosed(false);
+      int sleepStep = 2;
+      int sleepIteration = 0;
+      while (true) {
+        try {
+          if (inputFile.isDrain()) {
+            break;
+          }
+
+          String line = br.readLine();
+          if (line == null) {
+            if (!resume) {
+              resume = true;
+            }
+            sleepIteration++;
+            if (sleepIteration == 2) {
+              inputFile.flush();
+              if (!follow) {
+                LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
+                break;
+              }
+            } else if (sleepIteration > 4) {
+              Object newFileKey = inputFile.getFileKeyFromLogFile(logPathFile);
+              if (newFileKey != null && (inputFile.getFileKey() == null || !newFileKey.equals(inputFile.getFileKey()))) {
+                LOG.info("File key is different. Marking this input file for rollover. oldKey=" + inputFile.getFileKey() + ", newKey=" +
+                  newFileKey + ". " + inputFile.getShortDescription());
+
+                try {
+                  LOG.info("File is rolled over. Closing current open file." + inputFile.getShortDescription() + ", lineCount=" +
+                    lineCount);
+                  br.close();
+                } catch (Exception ex) {
+                  LOG.error("Error closing file" + inputFile.getShortDescription(), ex);
+                  break;
+                }
+
+                try {
+                  LOG.info("Opening new rolled over file." + inputFile.getShortDescription());
+                  br = inputFile.openLogFile(logPathFile);
+                  lineCount = 0;
+                } catch (Exception ex) {
+                  LOG.error("Error opening rolled over file. " + inputFile.getShortDescription(), ex);
+                  LOG.info("Added input to not ready list." + inputFile.getShortDescription());
+                  inputFile.setReady(false);
+                  inputFile.getInputManager().addToNotReady(inputFile);
+                  break;
+                }
+                LOG.info("File is successfully rolled over. " + inputFile.getShortDescription());
+                continue;
+              }
+            }
+            try {
+              Thread.sleep(sleepStep * 1000);
+              sleepStep = Math.min(sleepStep * 2, 10);
+            } catch (InterruptedException e) {
+              LOG.info("Thread interrupted." + inputFile.getShortDescription());
+            }
+          } else {
+            lineCount++;
+            sleepStep = 1;
+            sleepIteration = 0;
+
+            if (!resume && lineCount > resumeFromLineNumber) {
+              LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + inputFile.getShortDescription());
+              resume = true;
+            }
+            if (resume) {
+              InputFileMarker marker = new InputFileMarker(inputFile, inputFile.getBase64FileKey(), lineCount);
+              inputFile.outputLine(line, marker);
+            }
+          }
+        } catch (Throwable t) {
+          String logMessageKey = inputFile.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
+            ", input=" + inputFile.getShortDescription(), t, LOG, Level.ERROR);
+        }
+      }
+    } finally {
+      if (br != null) {
+        LOG.info("Closing reader." + inputFile.getShortDescription() + ", lineCount=" + lineCount);
+        try {
+          br.close();
+        } catch (Throwable t) {
+          // ignore
+        }
+      }
+    }
+  }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
new file mode 100644
index 0000000..9350200
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ResumeLineNumberHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResumeLineNumberHelper.class);
+
+  private ResumeLineNumberHelper() {
+  }
+
+  public static int getResumeFromLineNumber(InputFile inputFile) {
+    int resumeFromLineNumber = 0;
+
+    File checkPointFile = null;
+    try {
+      LOG.info("Checking existing checkpoint file. " + inputFile.getShortDescription());
+
+      String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
+      File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
+      checkPointFile = new File(checkPointFolder, checkPointFileName);
+      inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+      Map<String, Object> jsonCheckPoint = null;
+      if (!checkPointFile.exists()) {
+        LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + " doesn't exist, starting to read it from the beginning");
+      } else {
+        try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
+          int contentSize = checkPointWriter.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointWriter.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+              readSize + ", checkPointFile=" + checkPointFile + ", input=" + inputFile.getShortDescription());
+          } else {
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+            resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+
+            LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+              ", resumeFromLineNumber=" + resumeFromLineNumber);
+          }
+        } catch (EOFException eofEx) {
+          LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+            inputFile.getShortDescription(), eofEx);
+        }
+      }
+      if (jsonCheckPoint == null) {
+        // This seems to be first time, so creating the initial checkPoint object
+        jsonCheckPoint = new HashMap<String, Object>();
+        jsonCheckPoint.put("file_path", inputFile.getFilePath());
+        jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+      }
+
+      inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+
+    } catch (Throwable t) {
+      LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
+    }
+
+    return resumeFromLineNumber;
+  }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
index 9ccc4f2..7f78fd1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ambari.logfeeder.input.reader;
 
+import org.apache.log4j.Logger;
+
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -25,8 +27,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.zip.GZIPInputStream;
 
-import org.apache.log4j.Logger;
-
 class GZIPReader extends InputStreamReader {
 
   private static final Logger LOG = Logger.getLogger(GZIPReader.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
index 5fc2e14..b9393aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -18,13 +18,13 @@
  */
 package org.apache.ambari.logfeeder.input.reader;
 
+import org.apache.log4j.Logger;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.Reader;
 
-import org.apache.log4j.Logger;
-
 public enum LogsearchReaderFactory {
   INSTANCE;
   private static final Logger LOG = Logger.getLogger(LogsearchReaderFactory.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
index 83c293b..1874ee9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -16,20 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.loglevelfilter;
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
@@ -42,6 +33,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 public class LogLevelFilterHandler implements LogLevelFilterMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(LogLevelFilterHandler.class);
@@ -50,7 +48,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
   private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
 
   private static final boolean DEFAULT_VALUE = true;
-  
+
   private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
     protected DateFormat initialValue() {
       SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
@@ -92,7 +90,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
     if (!logFeederProps.isLogLevelFilterEnabled()) {
       return true;
     }
-    
+
     LogLevelFilter logFilter = findLogFilter(logId);
     List<String> allowedLevels = getAllowedLevels(hostName, logFilter);
     return allowedLevels.isEmpty() || allowedLevels.contains(level);
@@ -107,7 +105,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
   }
 
   public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype()))
+    if ("audit".equals(inputMarker.getInput().getInputDescriptor().getRowtype()))
       return true;
 
     boolean isAllowed = applyFilter(jsonObj);
@@ -171,7 +169,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
       if (hosts.isEmpty() || hosts.contains(hostName)) {
         if (isFilterExpired(componentFilter)) {
           LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
-              componentFilter.getExpiryTime());
+            componentFilter.getExpiryTime());
           return defaultLevels;
         } else {
           return overrideLevels;
@@ -193,8 +191,8 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
     Date currentDate = new Date();
     if (!currentDate.before(filterEndDate)) {
       LOG.debug("Filter for  Component :" + logLevelFilter.getLabel() + " and Hosts : [" +
-          StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
-          formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
+        StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
+        formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
       return true;
     } else {
       return false;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
index c85ad49..8c0fc72 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
@@ -19,8 +19,9 @@
 
 package org.apache.ambari.logfeeder.mapper;
 
-import java.util.Map;
-
+import com.google.common.base.Splitter;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
@@ -29,9 +30,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Splitter;
+import java.util.Map;
 
-public class MapperAnonymize extends Mapper {
+public class MapperAnonymize extends Mapper<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(MapperAnonymize.class);
   
   private static final char DEFAULT_HIDE_CHAR = '*';
@@ -115,6 +116,6 @@ public class MapperAnonymize extends Mapper {
     
     sb.append(rest);
     
-    jsonObj.put(fieldName, sb.toString());
+    jsonObj.put(getFieldName(), sb.toString());
   }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index e099161..150869b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -19,27 +19,29 @@
 
 package org.apache.ambari.logfeeder.mapper;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Map;
-
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-public class MapperDate extends Mapper {
+import java.text.ParseException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+
+public class MapperDate extends Mapper<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(MapperDate.class);
 
-  private SimpleDateFormat targetDateFormatter = null;
+  private FastDateFormat targetDateFormatter = null;
   private boolean isEpoch = false;
-  private SimpleDateFormat srcDateFormatter = null;
+  private FastDateFormat srcDateFormatter=null;
 
   @Override
   public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
@@ -57,9 +59,9 @@ public class MapperDate extends Mapper {
         return true;
       } else {
         try {
-          targetDateFormatter = new SimpleDateFormat(targetDateFormat);
+          targetDateFormatter = FastDateFormat.getInstance(targetDateFormat);
           if (!StringUtils.isEmpty(srcDateFormat)) {
-            srcDateFormatter = new SimpleDateFormat(srcDateFormat);
+            srcDateFormatter = FastDateFormat.getInstance(srcDateFormat);
           }
           return true;
         } catch (Throwable ex) {
@@ -90,10 +92,10 @@ public class MapperDate extends Mapper {
         } else {
           return value;
         }
-        jsonObj.put(fieldName, value);
+        jsonObj.put(getFieldName(), value);
       } catch (Throwable t) {
         LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying date transformation." +
-            " isEpoch=" + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
+            " isEpoch=" + isEpoch + ", targetDateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.getPattern():"")
             + ", value=" + value + ". " + this.toString(), t, LOG, Level.ERROR);
       }
     }
@@ -105,7 +107,7 @@ public class MapperDate extends Mapper {
     
     Calendar currentCalendar = Calendar.getInstance();
     
-    if (!srcDateFormatter.toPattern().contains("dd")) {
+    if (!srcDateFormatter.getPattern().contains("dd")) {
       //set year/month/date in src_date when src_date does not have date component
       srcDate = DateUtils.setYears(srcDate, currentCalendar.get(Calendar.YEAR));
       srcDate = DateUtils.setMonths(srcDate, currentCalendar.get(Calendar.MONTH));
@@ -114,7 +116,7 @@ public class MapperDate extends Mapper {
       if (srcDate.getTime() > currentCalendar.getTimeInMillis()) {
         srcDate = DateUtils.addDays(srcDate, -1);
       }      
-    } else if (!srcDateFormatter.toPattern().contains("yy")) {
+    } else if (!srcDateFormatter.getPattern().contains("yy")) {
       //set year in src_date when src_date does not have year component
       srcDate = DateUtils.setYears(srcDate, currentCalendar.get(Calendar.YEAR));
       // if with the current year the time stamp is after the current one, it must be previous year
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
index a463f49..bbb6337 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
@@ -19,17 +19,19 @@
 
 package org.apache.ambari.logfeeder.mapper;
 
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
+import java.util.Map;
+
 /**
  * Overrides the value for the field
  */
-public class MapperFieldCopy extends Mapper {
+public class MapperFieldCopy extends Mapper<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(MapperFieldCopy.class);
   
   private String copyName = null;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 3f160da..2b1f70f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -19,8 +19,8 @@
 
 package org.apache.ambari.logfeeder.mapper;
 
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
@@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import java.util.Map;
+
 /**
  * Overrides the value for the field
  */
-public class MapperFieldName extends Mapper {
+public class MapperFieldName extends Mapper<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(MapperFieldName.class);
 
   private String newValue = null;
@@ -51,7 +53,7 @@ public class MapperFieldName extends Mapper {
   @Override
   public Object apply(Map<String, Object> jsonObj, Object value) {
     if (newValue != null) {
-      jsonObj.remove(fieldName);
+      jsonObj.remove(getFieldName());
       jsonObj.put(newValue, value);
     } else {
       LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 03ff95b..e3d4924 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -19,8 +19,8 @@
 
 package org.apache.ambari.logfeeder.mapper;
 
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
@@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import java.util.Map;
+
 /**
  * Overrides the value for the field
  */
-public class MapperFieldValue extends Mapper {
+public class MapperFieldValue extends Mapper<LogFeederProps> {
   private static final Logger LOG = Logger.getLogger(MapperFieldValue.class);
   
   private String prevValue = null;
@@ -55,7 +57,7 @@ public class MapperFieldValue extends Mapper {
     if (newValue != null && prevValue != null) {
       if (prevValue.equalsIgnoreCase(value.toString())) {
         value = newValue;
-        jsonObj.put(fieldName, value);
+        jsonObj.put(getFieldName(), value);
       }
     } else {
       LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
index 96084c1..f5bc0eb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 
 import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig;
 import org.apache.ambari.logfeeder.conf.MetricsCollectorConfig;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
index 1dd9287..91de1d8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logfeeder.metrics;
 
 import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
index c46086e..5cde6db 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -19,9 +19,9 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.util.Map;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 
-import org.apache.ambari.logfeeder.input.InputMarker;
+import java.util.Map;
 
 /**
  * This contains the output json object and InputMarker.
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index fa4e17b..dafe6a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -18,15 +18,18 @@
  */
 package org.apache.ambari.logfeeder.output;
 
-import java.io.File;
-
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 import org.apache.log4j.Logger;
 
+import java.io.File;
+
 /**
  * Output that just ignore the logs
  */
-public class OutputDevNull extends Output {
+public class OutputDevNull extends Output<LogFeederProps, InputMarker> {
 
   private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
 
@@ -36,7 +39,44 @@ public class OutputDevNull extends Output {
   }
 
   @Override
+  public Long getPendingCount() {
+    return 0L;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return "write:devnull";
+  }
+
+  @Override
+  public String getOutputType() {
+    return "devnull";
+  }
+
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
     throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
   }
+
+  @Override
+  public void init(LogFeederProps LogFeederProps) throws Exception {
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "write:devnull";
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return "write:devnull";
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 4576deb..e70d769 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -19,32 +19,35 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Map;
-
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
-public class OutputFile extends Output {
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+public class OutputFile extends Output<LogFeederProps, InputFileMarker> {
   private static final Logger LOG = Logger.getLogger(OutputFile.class);
 
   private PrintWriter outWriter;
   private String filePath = null;
   private String codec;
+  private LogFeederProps logFeederProps;
 
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
-
+    this.logFeederProps = logFeederProps;
     filePath = getStringValue("path");
     if (StringUtils.isEmpty(filePath)) {
       LOG.error("Filepath config property <path> is not set in config file.");
@@ -87,11 +90,11 @@ public class OutputFile extends Output {
         // Ignore this exception
       }
     }
-    isClosed = true;
+    setClosed(true);
   }
 
   @Override
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
+  public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
     String outStr = null;
     CSVPrinter csvPrinter = null;
     try {
@@ -118,7 +121,7 @@ public class OutputFile extends Output {
   }
 
   @Override
-  synchronized public void write(String block, InputMarker inputMarker) throws Exception {
+  synchronized public void write(String block, InputFileMarker inputMarker) throws Exception {
     if (outWriter != null && block != null) {
       statMetric.value++;
 
@@ -128,11 +131,35 @@ public class OutputFile extends Output {
   }
 
   @Override
+  public Long getPendingCount() {
+    return null;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return "output.kafka.write_bytes";
+  }
+
+  @Override
   public String getShortDescription() {
     return "output:destination=file,path=" + filePath;
   }
 
   @Override
+  public String getStatMetricName() {
+    return "output.file.write_logs";
+  }
+
+  @Override
+  public String getOutputType() {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  }
+
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+  }
+
+  @Override
   public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
     throw new UnsupportedOperationException("copyFile method is not yet supported for output=file");
   }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index ed66eb0..f2faf64 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -20,14 +20,17 @@
 package org.apache.ambari.logfeeder.output;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
 import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
@@ -43,7 +46,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  *
  * The events are spooled on the local file system and uploaded in batches asynchronously.
  */
-public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
+public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
   private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class);
   
   private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
@@ -64,9 +67,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
 
   private LogSpooler logSpooler;
 
+  private LogFeederProps logFeederProps;
+
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
+    this.logFeederProps = logFeederProps;
     hdfsOutDir = getStringValue("hdfs_out_dir");
     hdfsHost = getStringValue("hdfs_host");
     hdfsPort = getStringValue("hdfs_port");
@@ -98,11 +103,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
     LOG.info("Closing file." + getShortDescription());
     logSpooler.rollover();
     this.stopHDFSCopyThread();
-    isClosed = true;
+    setClosed(true);
   }
 
   @Override
-  public synchronized void write(String block, InputMarker inputMarker) throws Exception {
+  public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
     if (block != null) {
       logSpooler.add(block);
       statMetric.value++;
@@ -234,4 +239,28 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
     }
     return shouldRollover;
   }
+
+  @Override
+  public String getOutputType() {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  }
+
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+  }
+
+  @Override
+  public Long getPendingCount() {
+    return 0L;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return "output.hdfs.write_bytes";
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return "output.hdfs.write_logs";
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index 5c8ec82..7539484 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,15 +19,12 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.io.File;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedTransferQueue;
-
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,7 +34,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-public class OutputKafka extends Output {
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+
+public class OutputKafka extends Output<LogFeederProps, InputFileMarker> {
   private static final Logger LOG = Logger.getLogger(OutputKafka.class);
 
   private static final int FAILED_RETRY_INTERVAL = 30;
@@ -56,19 +59,21 @@ public class OutputKafka extends Output {
   // Let's start with the assumption Kafka is down
   private boolean isKafkaBrokerUp = false;
 
+  private LogFeederProps logFeederProps;
+
   @Override
-  protected String getStatMetricName() {
+  public String getStatMetricName() {
     return "output.kafka.write_logs";
   }
 
   @Override
-  protected String getWriteBytesMetricName() {
+  public String getWriteBytesMetricName() {
     return "output.kafka.write_bytes";
   }
   
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
+    this.logFeederProps = logFeederProps;
     Properties props = initProperties();
 
     producer = creteKafkaProducer(props);
@@ -99,9 +104,9 @@ public class OutputKafka extends Output {
     props.put("batch.size", batchSize);
     props.put("linger.ms", lingerMS);
 
-    for (String key : configs.keySet()) {
+    for (String key : getConfigs().keySet()) {
       if (key.startsWith("kafka.")) {
-        Object value = configs.get(key);
+        Object value = getConfigs().get(key);
         if (value == null || value.toString().length() == 0) {
           continue;
         }
@@ -151,15 +156,15 @@ public class OutputKafka extends Output {
   }
 
   @Override
-  public synchronized void write(String block, InputMarker inputMarker) throws Exception {
-    while (!isDrain() && !inputMarker.input.isDrain()) {
+  public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
+    while (!isDrain() && !inputMarker.getInput().isDrain()) {
       try {
         if (failedMessages.size() == 0) {
           if (publishMessage(block, inputMarker)) {
             break;
           }
         }
-        if (isDrain() || inputMarker.input.isDrain()) {
+        if (isDrain() || inputMarker.getInput().isDrain()) {
           break;
         }
         if (!isKafkaBrokerUp) {
@@ -281,4 +286,19 @@ public class OutputKafka extends Output {
   public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
     throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
   }
+
+  @Override
+  public String getOutputType() {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  }
+
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  };
+
+  @Override
+  public Long getPendingCount() {
+    return 0L;
+  }
 }
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
index 8308a4f..04600a3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -19,8 +19,8 @@
 package org.apache.ambari.logfeeder.output;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
similarity index 83%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index f5c4176..bebf225 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -16,23 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.output;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.MurmurHash;
 import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
@@ -41,9 +36,15 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
-public class OutputManager {
-  private static final Logger LOG = Logger.getLogger(OutputManager.class);
+public class OutputManagerImpl extends OutputManager {
+  private static final Logger LOG = Logger.getLogger(OutputManagerImpl.class);
 
   private static final int HASH_SEED = 31174077;
   private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
@@ -81,6 +82,7 @@ public class OutputManager {
     this.outputs.add(output);
   }
 
+  @Override
   public void init() throws Exception {
     for (Output output : outputs) {
       output.init(logFeederProps);
@@ -88,7 +90,7 @@ public class OutputManager {
   }
 
   public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.input;
+    Input input = inputMarker.getInput();
 
     // Update the block with the context fields
     for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
@@ -102,8 +104,11 @@ public class OutputManager {
     if (jsonObj.get("type") == null) {
       jsonObj.put("type", input.getInputDescriptor().getType());
     }
-    if (jsonObj.get("path") == null && input.getFilePath() != null) {
-      jsonObj.put("path", input.getFilePath());
+    if (input.getClass().isAssignableFrom(InputFile.class)) { // TODO: find better solution
+      InputFile inputFile = (InputFile) input;
+      if (jsonObj.get("path") == null && inputFile.getFilePath() != null) {
+        jsonObj.put("path", inputFile.getFilePath());
+      }
     }
     if (jsonObj.get("path") == null && input.getInputDescriptor().getPath() != null) {
       jsonObj.put("path", input.getInputDescriptor().getPath());
@@ -114,10 +119,8 @@ public class OutputManager {
     if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
       jsonObj.put("ip", LogFeederUtil.ipAddress);
     }
-    if (jsonObj.get("level") == null) {
-      jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-    }
-    
+    jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+
     if (input.isUseEventMD5() || input.isGenEventMD5()) {
       String prefix = "";
       Object logtimeObj = jsonObj.get("logtime");
@@ -128,7 +131,7 @@ public class OutputManager {
           prefix = logtimeObj.toString();
         }
       }
-      
+
       Long eventMD5 = MurmurHash.hash64A(LogFeederUtil.getGson().toJson(jsonObj).getBytes(), HASH_SEED);
       if (input.isGenEventMD5()) {
         jsonObj.put("event_md5", prefix + eventMD5.toString());
@@ -145,8 +148,9 @@ public class OutputManager {
     if (jsonObj.get("event_count") == null) {
       jsonObj.put("event_count", new Integer(1));
     }
-    if (inputMarker.lineNumber > 0) {
-      jsonObj.put("logfile_line_number", new Integer(inputMarker.lineNumber));
+    if (inputMarker.getAllProperties().containsKey("line_number") &&
+      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+      jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
     }
     if (jsonObj.containsKey("log_message")) {
       // TODO: Let's check size only for log_message for now
@@ -157,8 +161,9 @@ public class OutputManager {
       }
     }
     if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker)
-      && !outputLineFilter.apply(jsonObj, inputMarker.input)) {
-      for (Output output : input.getOutputList()) {
+      && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
+      List<? extends Output> outputList = input.getOutputList();
+      for (Output output : outputList) {
         try {
           output.write(jsonObj, inputMarker);
         } catch (Exception e) {
@@ -174,8 +179,8 @@ public class OutputManager {
       messageTruncateMetric.value++;
       String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
       LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
-          ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
-          StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
+        ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
+        StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
       logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
       jsonObj.put("log_message", logMessage);
       List<String> tagsList = (List<String>) jsonObj.get("tags");
@@ -190,7 +195,8 @@ public class OutputManager {
 
   public void write(String jsonBlock, InputMarker inputMarker) {
     if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker)) {
-      for (Output output : inputMarker.input.getOutputList()) {
+      List<? extends Output> outputList = inputMarker.getInput().getOutputList();
+      for (Output output : outputList) {
         try {
           output.write(jsonBlock, inputMarker);
         } catch (Exception e) {
@@ -201,8 +207,9 @@ public class OutputManager {
   }
 
   public void copyFile(File inputFile, InputMarker inputMarker) {
-    Input input = inputMarker.input;
-    for (Output output : input.getOutputList()) {
+    Input input = inputMarker.getInput();
+    List<? extends Output> outputList = input.getOutputList();
+    for (Output output : outputList) {
       try {
         output.copyFile(inputFile, inputMarker);
       }catch (Exception e) {
@@ -235,7 +242,7 @@ public class OutputManager {
         // Ignore
       }
     }
-    
+
     // Need to get this value from property
     int iterations = 30;
     int waitTimeMS = 1000;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index d8eed2b..bae7eec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.output;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
 import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -52,7 +54,7 @@ import java.util.Map;
  * <li>A batch mode, asynchronous, periodic upload of files</li>
  * </ul>
  */
-public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+public class OutputS3File extends OutputFile implements RolloverCondition, RolloverHandler {
   private static final Logger LOG = Logger.getLogger(OutputS3File.class);
 
   public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
@@ -60,10 +62,11 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   private LogSpooler logSpooler;
   private S3OutputConfiguration s3OutputConfiguration;
   private S3Uploader s3Uploader;
+  private LogFeederProps logFeederProps;
 
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
+    this.logFeederProps = logFeederProps;
     s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
   }
 
@@ -76,9 +79,9 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
    */
   @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
-    String type = inputMarker.input.getInputDescriptor().getType();
+    String type = inputMarker.getInput().getInputDescriptor().getType();
     S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
-    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getInputDescriptor().getType());
+    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.getInput().getInputDescriptor().getType());
 
     uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
   }
@@ -87,8 +90,8 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
       String resolvedPath) {
 
     ArrayList<FilterDescriptor> filters = new ArrayList<>();
-    addFilters(filters, inputMarker.input.getFirstFilter());
-    InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.input.getInputDescriptor();
+    addFilters(filters, inputMarker.getInput().getFirstFilter());
+    InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.getInput().getInputDescriptor();
     InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
         InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
     String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
@@ -189,12 +192,17 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
    * @throws Exception
    */
   @Override
-  public void write(String block, InputMarker inputMarker) throws Exception {
+  public void write(String block, InputFileMarker inputMarker) throws Exception {
     if (logSpooler == null) {
-      logSpooler = createSpooler(inputMarker.input.getFilePath());
-      s3Uploader = createUploader(inputMarker.input.getInputDescriptor().getType());
+      if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
+        InputFile input = (InputFile) inputMarker.getInput();
+        logSpooler = createSpooler(input.getFilePath());
+        s3Uploader = createUploader(input.getInputDescriptor().getType());
+        logSpooler.add(block);
+      } else {
+        LOG.error("Cannot write from non local file...");
+      }
     }
-    logSpooler.add(block);
   }
 
   @VisibleForTesting
@@ -206,7 +214,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
 
   @VisibleForTesting
   protected LogSpooler createSpooler(String filePath) {
-    String spoolDirectory = getLogFeederProps().getTmpDir() + "/s3/service";
+    String spoolDirectory = logFeederProps.getTmpDir() + "/s3/service";
     LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
     return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
         s3OutputConfiguration.getRolloverTimeThresholdSecs());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index cdb869a..9816c15 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,24 +19,9 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
@@ -56,7 +41,23 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 
-public class OutputSolr extends Output implements CollectionStateWatcher {
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class OutputSolr extends Output<LogFeederProps, InputMarker> implements CollectionStateWatcher {
 
   private static final Logger LOG = Logger.getLogger(OutputSolr.class);
 
@@ -88,6 +89,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
   private BlockingQueue<OutputData> outgoingBuffer = null;
   private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
 
+  private LogFeederProps logFeederProps;
+
   @Override
   public boolean monitorConfigChanges() {
     return true;
@@ -99,18 +102,18 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
   }
 
   @Override
-  protected String getStatMetricName() {
+  public String getStatMetricName() {
     return "output.solr.write_logs";
   }
 
   @Override
-  protected String getWriteBytesMetricName() {
+  public String getWriteBytesMetricName() {
     return "output.solr.write_bytes";
   }
   
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
-    super.init(logFeederProps);
+    this.logFeederProps = logFeederProps;
     initParams();
     setupSecurity();
     createOutgoingBuffer();
@@ -121,7 +124,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
   private void initParams() throws Exception {
     type = getStringValue("type");
     while (true) {
-      OutputSolrProperties outputSolrProperties = logSearchConfig.getOutputSolrProperties(type);
+      OutputSolrProperties outputSolrProperties = getLogSearchConfig().getOutputSolrProperties(type);
       if (outputSolrProperties == null) {
         LOG.info("Output solr properties for type " + type + " is not available yet.");
         try { Thread.sleep(OUTPUT_PROPERTIES_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
@@ -175,8 +178,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
   }
 
   private void setupSecurity() {
-    String jaasFile = getLogFeederProps().getLogFeederSecurityConfig().getSolrJaasFile();
-    boolean securityEnabled = getLogFeederProps().getLogFeederSecurityConfig().isSolrKerberosEnabled();
+    String jaasFile = logFeederProps.getLogFeederSecurityConfig().getSolrJaasFile();
+    boolean securityEnabled = logFeederProps.getLogFeederSecurityConfig().isSolrKerberosEnabled();
     if (securityEnabled) {
       System.setProperty("java.security.auth.login.config", jaasFile);
       HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
@@ -325,7 +328,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
   }
 
   @Override
-  public long getPendingCount() {
+  public Long getPendingCount() {
     long pendingCount = 0;
     for (SolrWorkerThread solrWorkerThread : workerThreadList) {
       pendingCount += solrWorkerThread.localBuffer.size();
@@ -476,7 +479,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
               Level.ERROR);
         }
       }
-      latestInputMarkers.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+      latestInputMarkers.put(outputData.inputMarker.getAllProperties().get("base64_file_key").toString(),
+        outputData.inputMarker);
       localBuffer.add(document);
     }
 
@@ -511,7 +515,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
       statMetric.value += localBuffer.size();
       writeBytesMetric.value += localBufferBytesSize;
       for (InputMarker inputMarker : latestInputMarkers.values()) {
-        inputMarker.input.checkIn(inputMarker);
+        inputMarker.getInput().checkIn(inputMarker);
       }
     }
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index e5974c5..a2d7692 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -18,11 +18,11 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.common.ConfigBlock;
-
 /**
  * Holds all configuration relevant for S3 upload.
  */
@@ -80,7 +80,7 @@ public class S3OutputConfiguration {
     return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
   }
 
-  public static S3OutputConfiguration fromConfigBlock(ConfigBlock configBlock) {
+  public static S3OutputConfiguration fromConfigBlock(ConfigItem configItem) {
     Map<String, Object> configs = new HashMap<>();
     String[] stringValuedKeysToCopy = new String[] {
         SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
@@ -88,7 +88,7 @@ public class S3OutputConfiguration {
     };
 
     for (String key : stringValuedKeysToCopy) {
-      String value = configBlock.getStringValue(key);
+      String value = configItem.getStringValue(key);
       if (value != null) {
         configs.put(key, value);
       }
@@ -103,10 +103,10 @@ public class S3OutputConfiguration {
     };
 
     for (int i = 0; i < longValuedKeysToCopy.length; i++) {
-      configs.put(longValuedKeysToCopy[i], configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+      configs.put(longValuedKeysToCopy[i], configItem.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
     }
 
-    configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));
+    configs.put(ADDITIONAL_FIELDS_KEY, configItem.getNVList(ADDITIONAL_FIELDS_KEY));
 
     return new S3OutputConfiguration(configs);
   }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index e95a663..ddf3995 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -22,7 +22,6 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.Upload;
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.CompressionUtil;
 import org.apache.ambari.logfeeder.util.S3Util;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index 1f13357..7fc47a9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -19,11 +19,14 @@
 package org.apache.ambari.logfeeder.output.spool;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.log4j.Logger;
 
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Date;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -32,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * A class that manages local storage of log events before they are uploaded to the output destinations.
  *
- * This class should be used by any {@link Output}s that wish to upload log files to an
+ * This class should be used by any {@link org.apache.ambari.logfeeder.plugin.output.Output}s that wish to upload log files to an
  * output destination on a periodic batched basis. Log events should be added to an instance
  * of this class to be stored locally. This class determines when to
  * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
@@ -57,7 +60,7 @@ public class LogSpooler {
   /**
    * Create an instance of the LogSpooler.
    * @param spoolDirectory The directory under which spooler files are created.
-   *                       Should be unique per instance of {@link Output}
+   *                       Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
    * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
    * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
    *                          determine when to rollover.
@@ -73,7 +76,7 @@ public class LogSpooler {
   /**
    * Create an instance of the LogSpooler.
    * @param spoolDirectory The directory under which spooler files are created.
-   *                       Should be unique per instance of {@link Output}
+   *                       Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
    * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
    * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
    *                          determine when to rollover.
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 9efab25..6fd00a4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -16,9 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ambari.logfeeder.util;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
 import java.lang.reflect.Type;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -26,31 +33,19 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * This class contains utility methods used by LogFeeder
- */
 public class LogFeederUtil {
   private static final Logger LOG = Logger.getLogger(LogFeederUtil.class);
 
   private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
   private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
-  
+
   public static Gson getGson() {
     return gson;
   }
 
   public static String hostName = null;
   public static String ipAddress = null;
-  
+
   static{
     try {
       InetAddress ip = InetAddress.getLocalHost();
@@ -65,7 +60,7 @@ public class LogFeederUtil {
         hostName = getHostName;
       }
       LOG.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName +
-          ", hostName=" + hostName);
+        ", hostName=" + hostName);
     } catch (UnknownHostException e) {
       LOG.error("Error getting hostname.", e);
     }
@@ -76,7 +71,7 @@ public class LogFeederUtil {
     long currMS = System.currentTimeMillis();
     if (currStat > metric.prevLogValue) {
       LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
-          " secs, count=" + (currStat - metric.prevLogValue) + postFix);
+        " secs, count=" + (currStat - metric.prevLogValue) + postFix);
     }
     metric.prevLogValue = currStat;
     metric.prevLogTime = currMS;
@@ -119,15 +114,15 @@ public class LogFeederUtil {
     private int counter = 0;
   }
 
-  private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+  private static Map<String, LogFeederUtil.LogHistory> logHistoryList = new Hashtable<>();
 
   public static boolean logErrorMessageByInterval(String key, String message, Throwable e, Logger callerLogger, Level level) {
-    LogHistory log = logHistoryList.get(key);
+    LogFeederUtil.LogHistory log = logHistoryList.get(key);
     if (log == null) {
-      log = new LogHistory();
+      log = new LogFeederUtil.LogHistory();
       logHistoryList.put(key, log);
     }
-    
+
     if ((System.currentTimeMillis() - log.lastLogTime) > 30 * 1000) {
       log.lastLogTime = System.currentTimeMillis();
       if (log.counter > 0) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index e3a822a..49fb301 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -21,9 +21,9 @@ package org.apache.ambari.logfeeder.filter;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterGrokDescriptorImpl;
 import org.apache.log4j.Logger;
@@ -65,12 +65,12 @@ public class FilterGrokTest {
     filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
     init(filterGrokDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
-    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -91,7 +91,7 @@ public class FilterGrokTest {
     filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
     init(filterGrokDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
@@ -99,7 +99,7 @@ public class FilterGrokTest {
         + "as one may expect";
     String[] messageLines = multiLineMessage.split("\r\n");
     for (int i = 0; i < messageLines.length; i++)
-      filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker(null, null, 0));
+      filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputFileMarker(null, null, 0));
     filterGrok.flush();
 
     EasyMock.verify(mockOutputManager);
@@ -121,12 +121,12 @@ public class FilterGrokTest {
     filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
     init(filterGrokDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall().anyTimes();
     EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
-    filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+    filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+    filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured!", capture.hasCaptured());
@@ -142,8 +142,8 @@ public class FilterGrokTest {
 
     EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
-    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured", capture.hasCaptured());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index ef10c46..36139ea 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -27,8 +27,8 @@ import java.util.TimeZone;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.common.LogFeederException;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
@@ -64,7 +64,7 @@ public class FilterJSONTest {
 
     init(new FilterJsonDescriptorImpl());
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
@@ -72,7 +72,7 @@ public class FilterJSONTest {
     DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
     sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
     String dateString = sdf.format(d);
-    filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker(null, null, 0));
+    filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -89,7 +89,7 @@ public class FilterJSONTest {
 
     init(new FilterJsonDescriptorImpl());
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
@@ -97,7 +97,7 @@ public class FilterJSONTest {
     DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
     sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
     String dateString = sdf.format(d);
-    filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker(null, null, 0));
+    filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -114,11 +114,11 @@ public class FilterJSONTest {
 
     init(new FilterJsonDescriptorImpl());
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
-    filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker(null, null, 0));
+    filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -137,7 +137,7 @@ public class FilterJSONTest {
     
     String inputStr = "invalid json";
     try{
-      filterJson.apply(inputStr,new InputMarker(null, null, 0));
+      filterJson.apply(inputStr,new InputFileMarker(null, null, 0));
       fail("Expected LogFeederException was not occured");
     } catch(LogFeederException logFeederException) {
       assertEquals("Json parsing failed for inputstr = " + inputStr, logFeederException.getLocalizedMessage());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 4a85b88..ff8cb6e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -21,10 +21,10 @@ package org.apache.ambari.logfeeder.filter;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterKeyValueDescriptorImpl;
-import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -62,11 +62,11 @@ public class FilterKeyValueTest {
     filterKeyValueDescriptor.setFieldSplit("&");
     init(filterKeyValueDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -87,11 +87,11 @@ public class FilterKeyValueTest {
     filterKeyValueDescriptor.setValueBorders("()");
     init(filterKeyValueDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall();
     EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ keyValueField: 'name1(value1)&name2(value2)' }", new InputMarker(null, null, 0));
+    filterKeyValue.apply("{ keyValueField: 'name1(value1)&name2(value2)' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
@@ -110,11 +110,11 @@ public class FilterKeyValueTest {
     filterKeyValueDescriptor.setFieldSplit("&");
     init(filterKeyValueDescriptor);
 
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall().anyTimes();
     EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured!", capture.hasCaptured());
@@ -130,11 +130,11 @@ public class FilterKeyValueTest {
     init(filterKeyValueDescriptor);
 
     // using default value split: =
-    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
     EasyMock.expectLastCall().anyTimes();
     EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+    filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
 
     EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 01b4e54..e3349fc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -26,8 +26,9 @@ import java.util.List;
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputFileDescriptorImpl;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
@@ -61,7 +62,7 @@ public class InputFileTest {
   private InputFile inputFile;
   private List<String> rows = new ArrayList<>();
 
-  private InputMarker testInputMarker;
+  private InputFileMarker testInputMarker;
 
   private LogFeederProps logFeederProps;
 
@@ -80,7 +81,11 @@ public class InputFileTest {
   public void setUp() throws Exception {
     logFeederProps = new LogFeederProps();
     LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig();
+    logEntryCacheConfig.setCacheEnabled(false);
+    logEntryCacheConfig.setCacheLastDedupEnabled(false);
+    logEntryCacheConfig.setCacheSize(10);
     logFeederProps.setLogEntryCacheConfig(logEntryCacheConfig);
+    testInputMarker = new InputFileMarker(inputFile, "", 0);
   }
 
   public void init(String path) throws Exception {
@@ -92,18 +97,21 @@ public class InputFileTest {
     inputFileDescriptor.setRowtype("service");
     inputFileDescriptor.setPath(path);
 
-    Filter capture = new Filter() {
+    Filter capture = new Filter<LogFeederProps>() {
       @Override
       public void init(LogFeederProps logFeederProps) {
       }
 
       @Override
+      public String getShortDescription() {
+        return null;
+      }
+
+      @Override
       public void apply(String inputStr, InputMarker inputMarker) {
         rows.add(inputStr);
         if (rows.size() % 3 == 0)
           inputFile.setDrain(true);
-
-        testInputMarker = inputMarker;
       }
     };
 
@@ -138,32 +146,6 @@ public class InputFileTest {
   }
 
   @Test
-  public void testInputFile_process6RowsInterrupted() throws Exception {
-    LOG.info("testInputFile_process6RowsInterrupted()");
-
-    File checkPointDir = createCheckpointDir("process6_checkpoint");
-    File testFile = createFile("process6.log");
-    init(testFile.getAbsolutePath());
-
-    InputManager inputMabager = EasyMock.createStrictMock(InputManager.class);
-    EasyMock.expect(inputMabager.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
-    EasyMock.replay(inputMabager);
-    inputFile.setInputManager(inputMabager);
-
-    inputFile.isReady();
-    inputFile.start();
-    inputFile.checkIn(testInputMarker);
-    inputFile.setDrain(false);
-    inputFile.start();
-
-    assertEquals("Amount of the rows is incorrect", rows.size(), 6);
-    for (int row = 0; row < 6; row++)
-      assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
-
-    EasyMock.verify(inputMabager);
-  }
-
-  @Test
   public void testInputFile_noLogPath() throws Exception {
     LOG.info("testInputFile_noLogPath()");
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
index 9dba349..574fa35 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
@@ -25,7 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.junit.Test;
 
 public class InputManagerTest {
@@ -42,7 +43,7 @@ public class InputManagerTest {
     
     replay(input1, input2, input3, input4);
     
-    InputManager manager = new InputManager();
+    InputManagerImpl manager = new InputManagerImpl();
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
     manager.add("serviceName", input3);
@@ -79,8 +80,8 @@ public class InputManagerTest {
     expect(input3.getShortDescription()).andReturn("").once();
     
     replay(input1, input2, input3);
-    
-    InputManager manager = new InputManager();
+
+    InputManagerImpl manager = new InputManagerImpl();
     manager.setLogFeederProps(logFeederProps);
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
@@ -109,8 +110,8 @@ public class InputManagerTest {
     expect(input3.isReady()).andReturn(false);
     
     replay(input1, input2, input3);
-    
-    InputManager manager = new InputManager();
+
+    InputManagerImpl manager = new InputManagerImpl();
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
     manager.add("serviceName", input3);
@@ -135,8 +136,8 @@ public class InputManagerTest {
     expect(input3.isReady()).andReturn(false);
     
     replay(input1, input2, input3);
-    
-    InputManager manager = new InputManager();
+
+    InputManagerImpl manager = new InputManagerImpl();
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
     manager.add("serviceName", input3);
@@ -157,8 +158,8 @@ public class InputManagerTest {
     input3.lastCheckIn(); expectLastCall();
     
     replay(input1, input2, input3);
-    
-    InputManager manager = new InputManager();
+
+    InputManagerImpl manager = new InputManagerImpl();
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
     manager.add("serviceName", input3);
@@ -183,8 +184,8 @@ public class InputManagerTest {
     expect(input3.isClosed()).andReturn(true);
     
     replay(input1, input2, input3);
-    
-    InputManager manager = new InputManager();
+
+    InputManagerImpl manager = new InputManagerImpl();
     manager.add("serviceName", input1);
     manager.add("serviceName", input2);
     manager.add("serviceName", input3);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
index dd97d27..4ff818a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input.cache;
 
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
 import org.joda.time.DateTime;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
index da8fff7..9699156 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.ambari.logfeeder.metrics;
 
 import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
+
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
index c0babc4..1623738 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
@@ -24,9 +24,9 @@ import java.util.Properties;
 import java.util.concurrent.Future;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.output.OutputKafka.KafkaCallBack;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -86,7 +86,7 @@ public class OutputKafkaTest {
     EasyMock.replay(mockKafkaProducer);
 
     for (int i = 0; i < 10; i++) {
-      InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
+      InputFileMarker inputMarker = new InputFileMarker(EasyMock.mock(Input.class), null, 0);
       outputKafka.write("value" + i, inputMarker);
     }
 
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
index 6e108ab..e97ccaf 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
@@ -19,8 +19,8 @@
 package org.apache.ambari.logfeeder.output;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.easymock.EasyMock;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
index 49f5a11..f1c27ab 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -30,10 +30,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ public class OutputManagerTest {
     
     replay(output1, output2, output3);
     
-    OutputManager manager = new OutputManager();
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
@@ -72,8 +73,8 @@ public class OutputManagerTest {
     output3.init(logFeederProps); expectLastCall();
     
     replay(output1, output2, output3);
-    
-    OutputManager manager = new OutputManager();
+
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
@@ -95,7 +96,7 @@ public class OutputManagerTest {
     jsonObj.put("id", "testId");
     
     Input mockInput = strictMock(Input.class);
-    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
     InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
     inputDescriptor.setAddFields(Collections.<String, String> emptyMap());
     
@@ -119,7 +120,7 @@ public class OutputManagerTest {
     
     replay(output1, output2, output3, mockFilter, mockInput);
     
-    OutputManager manager = new OutputManager();
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.setLogLevelFilterHandler(mockFilter);
     manager.add(output1);
     manager.add(output2);
@@ -135,7 +136,7 @@ public class OutputManagerTest {
     String jsonString = "{}";
     
     Input mockInput = strictMock(Input.class);
-    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
     InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
     
     Output output1 = strictMock(Output.class);
@@ -153,8 +154,8 @@ public class OutputManagerTest {
     output3.write(jsonString, inputMarker); expectLastCall();
     
     replay(output1, output2, output3, mockInput, mockFilter);
-    
-    OutputManager manager = new OutputManager();
+
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.setLogLevelFilterHandler(mockFilter);
     manager.add(output1);
     manager.add(output2);
@@ -179,7 +180,7 @@ public class OutputManagerTest {
     
     replay(output1, output2, output3);
     
-    OutputManager manager = new OutputManager();
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
@@ -201,7 +202,7 @@ public class OutputManagerTest {
     
     replay(output1, output2, output3);
     
-    OutputManager manager = new OutputManager();
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
@@ -216,7 +217,7 @@ public class OutputManagerTest {
     File f = new File("");
     
     Input mockInput = strictMock(Input.class);
-    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
     
     Output output1 = strictMock(Output.class);
     Output output2 = strictMock(Output.class);
@@ -230,7 +231,7 @@ public class OutputManagerTest {
     
     replay(output1, output2, output3, mockInput);
     
-    OutputManager manager = new OutputManager();
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
@@ -259,8 +260,8 @@ public class OutputManagerTest {
     expect(output3.isClosed()).andReturn(true);
     
     replay(output1, output2, output3);
-    
-    OutputManager manager = new OutputManager();
+
+    OutputManagerImpl manager = new OutputManagerImpl();
     manager.add(output1);
     manager.add(output2);
     manager.add(output3);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 78cf014..6674be1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -19,11 +19,7 @@
 package org.apache.ambari.logfeeder.output;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -68,76 +64,6 @@ public class OutputS3FileTest {
   }
 
   @Test
-  public void shouldSpoolLogEventToNewSpooler() throws Exception {
-
-    Input input = mock(Input.class);
-    InputMarker inputMarker = new InputMarker(input, null, 0);
-    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
-    inputDescriptor.setType("hdfs-namenode");
-    
-    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
-    final LogSpooler spooler = mock(LogSpooler.class);
-    spooler.add("log event block");
-    final S3Uploader s3Uploader = mock(S3Uploader.class);
-    replay(input, spooler, s3Uploader);
-
-    OutputS3File outputS3File = new OutputS3File() {
-      @Override
-      protected LogSpooler createSpooler(String filePath) {
-        return spooler;
-      }
-
-      @Override
-      protected S3Uploader createUploader(String logType) {
-        return s3Uploader;
-      }
-    };
-    outputS3File.loadConfig(configMap);
-    outputS3File.init(new LogFeederProps());
-    outputS3File.write("log event block", inputMarker);
-    verify(spooler);
-  }
-
-  @Test
-  public void shouldReuseSpoolerForSamePath() throws Exception {
-    Input input = mock(Input.class);
-    InputMarker inputMarker = new InputMarker(input, null, 0);
-    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
-    inputDescriptor.setType("hdfs-namenode");
-    
-    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
-    final LogSpooler spooler = mock(LogSpooler.class);
-    spooler.add("log event block1");
-    spooler.add("log event block2");
-    final S3Uploader s3Uploader = mock(S3Uploader.class);
-    replay(input, spooler, s3Uploader);
-
-    OutputS3File outputS3File = new OutputS3File() {
-      private boolean firstCallComplete;
-      @Override
-      protected LogSpooler createSpooler(String filePath) {
-        if (!firstCallComplete) {
-          firstCallComplete = true;
-          return spooler;
-        }
-        throw new IllegalStateException("Shouldn't call createSpooler for same path.");
-      }
-
-      @Override
-      protected S3Uploader createUploader(String logType) {
-        return s3Uploader;
-      }
-    };
-    outputS3File.loadConfig(configMap);
-    outputS3File.init(new LogFeederProps());
-    outputS3File.write("log event block1", inputMarker);
-    outputS3File.write("log event block2", inputMarker);
-    verify(spooler);
-  }
-
-  @Test
   public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
 
     String thresholdSize = Long.toString(15 * 1024 * 1024L);
@@ -171,35 +97,4 @@ public class OutputS3FileTest {
 
     assertFalse(outputS3File.shouldRollover(logSpoolerContext));
   }
-
-  @Test
-  public void shouldUploadFileOnRollover() throws Exception {
-    Input input = mock(Input.class);
-    InputMarker inputMarker = new InputMarker(input, null, 0);
-    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
-    inputDescriptor.setType("hdfs-namenode");
-    
-    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
-    final LogSpooler spooler = mock(LogSpooler.class);
-    spooler.add("log event block1");
-    final S3Uploader s3Uploader = mock(S3Uploader.class);
-    s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz");
-    replay(input, spooler, s3Uploader);
-
-    OutputS3File outputS3File = new OutputS3File() {
-      @Override
-      protected LogSpooler createSpooler(String filePath) {
-        return spooler;
-      }
-      @Override
-      protected S3Uploader createUploader(String logType) {
-        return s3Uploader;
-      }
-    };
-    outputS3File.write("log event block1", inputMarker);
-    outputS3File.handleRollover(new File("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz"));
-
-    verify(s3Uploader);
-  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
deleted file mode 100644
index 70d5c8f..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.NamedList;
-import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class OutputSolrTest {
-  private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
-
-  private OutputSolr outputSolr;
-  private LogSearchConfigLogFeeder logSearchConfigMock;
-  private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
-  private LogFeederProps logFeederProps = new LogFeederProps();
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Before
-  public void init() throws Exception {
-    LogFeederSecurityConfig logFeederSecurityConfig = new LogFeederSecurityConfig();
-    logFeederSecurityConfig.setSolrKerberosEnabled(false);
-    logFeederProps.setLogFeederSecurityConfig(logFeederSecurityConfig);
-    outputSolr = new OutputSolr() {
-      @SuppressWarnings("deprecation")
-      @Override
-      CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
-        return new CloudSolrClient(null) {
-          private static final long serialVersionUID = 1L;
-
-          @Override
-          public UpdateResponse add(Collection<SolrInputDocument> docs) {
-            for (SolrInputDocument doc : docs) {
-              receivedDocs.put((Integer) doc.getField("id").getValue(), doc);
-            }
-
-            UpdateResponse response = new UpdateResponse();
-            response.setResponse(new NamedList<Object>());
-            return response;
-          }
-        };
-      }
-    };
-    
-    OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl("hadoop_logs", "none");
-    logSearchConfigMock = EasyMock.createNiceMock(LogSearchConfigLogFeeder.class);
-    EasyMock.expect(logSearchConfigMock.getOutputSolrProperties("service")).andReturn(outputSolrProperties);
-    EasyMock.replay(logSearchConfigMock);
-    
-    outputSolr.setLogSearchConfig(logSearchConfigMock);
-  }
-
-  @Test
-  public void testOutputToSolr_uploadData() throws Exception {
-    LOG.info("testOutputToSolr_uploadData()");
-
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("zk_connect_string", "some zk_connect_string");
-    config.put("workers", "3");
-    config.put("type", "service");
-
-    outputSolr.loadConfig(config);
-    outputSolr.init(logFeederProps);
-
-    Map<Integer, SolrInputDocument> expectedDocs = new HashMap<>();
-
-    int count = 0;
-    for (int i = 0; i < 10; i++) {
-      Map<String, Object> jsonObj = new HashMap<>();
-      for (int j = 0; j < 3; j++)
-        jsonObj.put("name" + ++count, "value" + ++count);
-      jsonObj.put("id", ++count);
-
-      InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
-      outputSolr.write(jsonObj, inputMarker);
-
-      SolrInputDocument doc = new SolrInputDocument();
-      for (Map.Entry<String, Object> e : jsonObj.entrySet())
-        doc.addField(e.getKey(), e.getValue());
-
-      expectedDocs.put(count, doc);
-    }
-
-    Thread.sleep(100);
-    while (outputSolr.getPendingCount() > 0)
-      Thread.sleep(100);
-
-    int waitToFinish = 0;
-    if (receivedDocs.size() < 10 && waitToFinish < 10) {
-      Thread.sleep(100);
-      waitToFinish++;
-    }
-
-    Set<Integer> ids = new HashSet<>();
-    ids.addAll(receivedDocs.keySet());
-    ids.addAll(expectedDocs.keySet());
-    for (Integer id : ids) {
-      SolrInputDocument receivedDoc = receivedDocs.get(id);
-      SolrInputDocument expectedDoc = expectedDocs.get(id);
-
-      assertNotNull("No document received for id: " + id, receivedDoc);
-      assertNotNull("No document expected for id: " + id, expectedDoc);
-
-      Set<String> fieldNames = new HashSet<>();
-      fieldNames.addAll(receivedDoc.getFieldNames());
-      fieldNames.addAll(expectedDoc.getFieldNames());
-
-      for (String fieldName : fieldNames) {
-        Object receivedValue = receivedDoc.getFieldValue(fieldName);
-        Object expectedValue = expectedDoc.getFieldValue(fieldName);
-
-        assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
-        assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
-
-        assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, expectedValue);
-      }
-    }
-  }
-
-  @Test
-  public void testOutputToSolr_noZkConnectString() throws Exception {
-    LOG.info("testOutputToSolr_noUrlOrZkConnectString()");
-
-    expectedException.expect(Exception.class);
-    expectedException.expectMessage("For solr output the zk_connect_string property need to be set");
-
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("workers", "3");
-    config.put("type", "service");
-
-    outputSolr.loadConfig(config);
-    outputSolr.init(logFeederProps);
-  }
-
-  @After
-  public void cleanUp() {
-    receivedDocs.clear();
-    EasyMock.verify(logSearchConfigMock);
-  }
-}
diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml
index 5efb4ba..4a0bb6a 100644
--- a/ambari-logsearch/pom.xml
+++ b/ambari-logsearch/pom.xml
@@ -37,6 +37,7 @@
     <module>ambari-logsearch-config-api</module>
     <module>ambari-logsearch-config-zookeeper</module>
     <module>ambari-logsearch-it</module>
+    <module>ambari-logsearch-logfeeder-plugin-api</module>
   </modules>
   <properties>
     <jdk.version>1.8</jdk.version>

-- 
To stop receiving notification emails like this one, please contact
['"commits@ambari.apache.org" <co...@ambari.apache.org>'].