You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:56 UTC

[29/52] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index ec26a88..ffd6cec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -20,25 +20,73 @@
 package org.apache.ambari.logfeeder.util;
 
 import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 
 public class FileUtil {
-  private static final Logger logger = Logger.getLogger(FileUtil.class);
-
-  public static List<File> getAllFileFromDir(File directory,
-      String[] searchFileWithExtensions, boolean checkInSubDir) {
+  private static final Logger LOG = Logger.getLogger(FileUtil.class);
+  
+  private FileUtil() {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static List<File> getAllFileFromDir(File directory, String extension, boolean checkInSubDir) {
     if (!directory.exists()) {
-      logger.error(directory.getAbsolutePath() + " is not exists ");
-    } else if (directory.isDirectory()) {
-      return (List<File>) FileUtils.listFiles(directory,
-          searchFileWithExtensions, checkInSubDir);
+      LOG.error(directory.getAbsolutePath() + " is not exists ");
+    } else if (!directory.isDirectory()) {
+      LOG.error(directory.getAbsolutePath() + " is not Directory ");
     } else {
-      logger.error(directory.getAbsolutePath() + " is not Directory ");
+      return (List<File>) FileUtils.listFiles(directory, new String[]{extension}, checkInSubDir);
     }
     return new ArrayList<File>();
   }
+
+
+  public static Object getFileKey(File file) {
+    try {
+      Path fileFullPath = Paths.get(file.getAbsolutePath());
+      if (fileFullPath != null) {
+        BasicFileAttributes basicAttr = Files.readAttributes(fileFullPath, BasicFileAttributes.class);
+        return basicAttr.fileKey();
+      }
+    } catch (Throwable ex) {
+      LOG.error("Error getting file attributes for file=" + file, ex);
+    }
+    return file.toString();
+  }
+
+  public static File getFileFromClasspath(String filename) {
+    URL fileCompleteUrl = Thread.currentThread().getContextClassLoader().getResource(filename);
+    LOG.debug("File Complete URI :" + fileCompleteUrl);
+    File file = null;
+    try {
+      file = new File(fileCompleteUrl.toURI());
+    } catch (Exception exception) {
+      LOG.debug(exception.getMessage(), exception.getCause());
+    }
+    return file;
+  }
+
+  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {});
+      return jsonmap;
+    } catch (IOException e) {
+      LOG.error(e, e.getCause());
+    }
+    return new HashMap<String, Object>();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 32029ff..5bf600e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -22,36 +22,23 @@ package org.apache.ambari.logfeeder.util;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.InetAddress;
-import java.net.URL;
 import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.TimeZone;
 
 import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
 
-import com.google.common.collect.ObjectArrays;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -60,109 +47,80 @@ import com.google.gson.reflect.TypeToken;
  * This class contains utility methods used by LogFeeder
  */
 public class LogFeederUtil {
-  private static final Logger logger = Logger.getLogger(LogFeederUtil.class);
+  private static final Logger LOG = Logger.getLogger(LogFeederUtil.class);
 
-  private static final int HASH_SEED = 31174077;
-  public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
-  public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-  private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
-
-  private static Properties props;
-
-  private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
-  private static int logInterval = 30000; // 30 seconds
+  private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+  private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
+  
+  public static Gson getGson() {
+    return gson;
+  }
 
   public static String hostName = null;
   public static String ipAddress = null;
   
-  private static String logfeederTempDir = null;
-  
-  private static final Object _LOCK = new Object();
-  
   static{
-    setHostNameAndIP();
+    try {
+      InetAddress ip = InetAddress.getLocalHost();
+      ipAddress = ip.getHostAddress();
+      String getHostName = ip.getHostName();
+      String getCanonicalHostName = ip.getCanonicalHostName();
+      if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+        LOG.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+        hostName = getCanonicalHostName;
+      } else {
+        LOG.info("Using getHostName()=" + getHostName);
+        hostName = getHostName;
+      }
+      LOG.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName +
+          ", hostName=" + hostName);
+    } catch (UnknownHostException e) {
+      LOG.error("Error getting hostname.", e);
+    }
   }
   
-  public static Gson getGson() {
-    return gson;
-  }
-
-  private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
-    @Override
-    protected SimpleDateFormat initialValue() {
-      SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
-      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-      return sdf;
-    }
-  };
+  private static Properties props;
 
   /**
-   * This method will read the properties from System, followed by propFile
-   * and finally from the map
+   * This method will read the properties from System, followed by propFile and finally from the map
    */
-  public static void loadProperties(String propFile, String[] propNVList)
-    throws Exception {
-    logger.info("Loading properties. propFile=" + propFile);
+  public static void loadProperties(String propFile, String[] propNVList) throws Exception {
+    LOG.info("Loading properties. propFile=" + propFile);
     props = new Properties(System.getProperties());
     boolean propLoaded = false;
 
     // First get properties file path from environment value
     String propertiesFilePath = System.getProperty("properties");
-    if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+    if (StringUtils.isNotEmpty(propertiesFilePath)) {
       File propertiesFile = new File(propertiesFilePath);
       if (propertiesFile.exists() && propertiesFile.isFile()) {
-        logger.info("Properties file path set in environment. Loading properties file="
-          + propertiesFilePath);
-        FileInputStream fileInputStream = null;
-        try {
-          fileInputStream = new FileInputStream(propertiesFile);
-          props.load(fileInputStream);
+        LOG.info("Properties file path set in environment. Loading properties file=" + propertiesFilePath);
+        try (FileInputStream fis = new FileInputStream(propertiesFile)) {
+          props.load(fis);
           propLoaded = true;
         } catch (Throwable t) {
-          logger.error("Error loading properties file. properties file="
-            + propertiesFile.getAbsolutePath());
-        } finally {
-          if (fileInputStream != null) {
-            try {
-              fileInputStream.close();
-            } catch (Throwable t) {
-              // Ignore error
-            }
-          }
+          LOG.error("Error loading properties file. properties file=" + propertiesFile.getAbsolutePath());
         }
       } else {
-        logger.error("Properties file path set in environment, but file not found. properties file="
-          + propertiesFilePath);
+        LOG.error("Properties file path set in environment, but file not found. properties file=" + propertiesFilePath);
       }
     }
 
     if (!propLoaded) {
-      BufferedInputStream fileInputStream = null;
-      try {
+      try (BufferedInputStream bis = (BufferedInputStream) LogFeeder.class.getClassLoader().getResourceAsStream(propFile)) {
         // Properties not yet loaded, let's try from class loader
-        fileInputStream = (BufferedInputStream) LogFeeder.class
-          .getClassLoader().getResourceAsStream(propFile);
-        if (fileInputStream != null) {
-          logger.info("Loading properties file " + propFile
-            + " from classpath");
-          props.load(fileInputStream);
+        if (bis != null) {
+          LOG.info("Loading properties file " + propFile + " from classpath");
+          props.load(bis);
           propLoaded = true;
         } else {
-          logger.fatal("Properties file not found in classpath. properties file name= "
-            + propFile);
-        }
-      } finally {
-        if (fileInputStream != null) {
-          try {
-            fileInputStream.close();
-          } catch (IOException e) {
-          }
+          LOG.fatal("Properties file not found in classpath. properties file name= " + propFile);
         }
       }
     }
 
     if (!propLoaded) {
-      logger.fatal("Properties file is not loaded.");
+      LOG.fatal("Properties file is not loaded.");
       throw new Exception("Properties not loaded");
     } else {
       updatePropertiesFromMap(propNVList);
@@ -173,162 +131,124 @@ public class LogFeederUtil {
     if (nvList == null) {
       return;
     }
-    logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
-      + nvList.length);
-    if (nvList != null && nvList.length > 0) {
-      for (String nv : nvList) {
-        logger.info("Passed nv=" + nv);
-        if (nv.startsWith("-") && nv.length() > 1) {
-          nv = nv.substring(1);
-          logger.info("Stripped nv=" + nv);
-          int i = nv.indexOf("=");
-          if (nv.length() > i) {
-            logger.info("Candidate nv=" + nv);
-            String name = nv.substring(0, i);
-            String value = nv.substring(i + 1);
-            logger.info("Adding property from argument to properties. name="
-              + name + ", value=" + value);
-            props.put(name, value);
-          }
+    LOG.info("Trying to load additional proeprties from argument paramters. nvList.length=" + nvList.length);
+    for (String nv : nvList) {
+      LOG.info("Passed nv=" + nv);
+      if (nv.startsWith("-") && nv.length() > 1) {
+        nv = nv.substring(1);
+        LOG.info("Stripped nv=" + nv);
+        int i = nv.indexOf("=");
+        if (nv.length() > i) {
+          LOG.info("Candidate nv=" + nv);
+          String name = nv.substring(0, i);
+          String value = nv.substring(i + 1);
+          LOG.info("Adding property from argument to properties. name=" + name + ", value=" + value);
+          props.put(name, value);
         }
       }
     }
   }
 
-  static public String getStringProperty(String key) {
-    if (props != null) {
-      return props.getProperty(key);
-    }
-    return null;
+  public static String getStringProperty(String key) {
+    return props == null ? null : props.getProperty(key);
   }
 
-  static public String getStringProperty(String key, String defaultValue) {
-    if (props != null) {
-      return props.getProperty(key, defaultValue);
-    }
-    return defaultValue;
+  public static String getStringProperty(String key, String defaultValue) {
+    return props == null ? defaultValue : props.getProperty(key, defaultValue);
   }
 
-  static public boolean getBooleanProperty(String key, boolean defaultValue) {
-    String strValue = getStringProperty(key);
-    return toBoolean(strValue, defaultValue);
+  public static boolean getBooleanProperty(String key, boolean defaultValue) {
+    String value = getStringProperty(key);
+    return toBoolean(value, defaultValue);
   }
 
-  private static boolean toBoolean(String strValue, boolean defaultValue) {
-    boolean retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      if (strValue.equalsIgnoreCase("true")
-        || strValue.equalsIgnoreCase("yes")) {
-        retValue = true;
-      } else {
-        retValue = false;
-      }
+  private static boolean toBoolean(String value, boolean defaultValue) {
+    if (StringUtils.isEmpty(value)) {
+      return defaultValue;
     }
-    return retValue;
+    
+    return "true".equalsIgnoreCase(value) || "yes".equalsIgnoreCase(value);
   }
 
-  static public int getIntProperty(String key, int defaultValue) {
-    String strValue = getStringProperty(key);
-    int retValue = defaultValue;
-    retValue = objectToInt(strValue, retValue, ", key=" + key);
+  public static int getIntProperty(String key, int defaultValue) {
+    String value = getStringProperty(key);
+    int retValue = objectToInt(value, defaultValue, ", key=" + key);
     return retValue;
   }
 
-  public static int objectToInt(Object objValue, int retValue,
-                                String errMessage) {
+  public static int objectToInt(Object objValue, int retValue, String errMessage) {
     if (objValue == null) {
       return retValue;
     }
     String strValue = objValue.toString();
-    if (!StringUtils.isEmpty(strValue)) {
+    if (StringUtils.isNotEmpty(strValue)) {
       try {
         retValue = Integer.parseInt(strValue);
       } catch (Throwable t) {
-        logger.error("Error parsing integer value. str=" + strValue
-          + ", " + errMessage);
+        LOG.error("Error parsing integer value. str=" + strValue + ", " + errMessage);
       }
     }
     return retValue;
   }
 
-  public static boolean isEnabled(Map<String, Object> conditionConfigs,
-                                  Map<String, Object> valueConfigs) {
-    boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
-    @SuppressWarnings("unchecked")
-    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
-      .get("conditions");
-    if (conditions != null && conditions.size() > 0) {
-      allow = false;
-      for (String conditionType : conditions.keySet()) {
-        if (conditionType.equalsIgnoreCase("fields")) {
-          @SuppressWarnings("unchecked")
-          Map<String, Object> fields = (Map<String, Object>) conditions
-            .get("fields");
-          for (String fieldName : fields.keySet()) {
-            Object values = fields.get(fieldName);
-            if (values instanceof String) {
-              allow = isFieldConditionMatch(valueConfigs,
-                fieldName, (String) values);
-            } else {
-              @SuppressWarnings("unchecked")
-              List<String> listValues = (List<String>) values;
-              for (String stringValue : listValues) {
-                allow = isFieldConditionMatch(valueConfigs,
-                  fieldName, stringValue);
-                if (allow) {
-                  break;
-                }
-              }
-            }
-            if (allow) {
-              break;
+  @SuppressWarnings("unchecked")
+  public static boolean isEnabled(Map<String, Object> conditionConfigs, Map<String, Object> valueConfigs) {
+    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs.get("conditions");
+    if (MapUtils.isEmpty(conditions)) {
+      return toBoolean((String) valueConfigs.get("is_enabled"), true);
+    }
+    
+    for (String conditionType : conditions.keySet()) {
+      if (!conditionType.equalsIgnoreCase("fields")) {
+        continue;
+      }
+      
+      Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
+      for (Map.Entry<String, Object> field : fields.entrySet()) {
+        if (field.getValue() instanceof String) {
+          if (isFieldConditionMatch(valueConfigs, field.getKey(), (String) field.getValue())) {
+            return true;
+          }
+        } else {
+          for (String stringValue : (List<String>) field.getValue()) {
+            if (isFieldConditionMatch(valueConfigs, field.getKey(), stringValue)) {
+              return true;
             }
           }
         }
-        if (allow) {
-          break;
-        }
       }
     }
-    return allow;
+    
+    return false;
   }
 
-  public static boolean isFieldConditionMatch(Map<String, Object> configs,
-                                              String fieldName, String stringValue) {
+  private static boolean isFieldConditionMatch(Map<String, Object> configs, String fieldName, String stringValue) {
     boolean allow = false;
     String fieldValue = (String) configs.get(fieldName);
     if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
       allow = true;
     } else {
       @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs
-        .get("add_fields");
+      Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
       if (addFields != null && addFields.get(fieldName) != null) {
         String addFieldValue = (String) addFields.get(fieldName);
         if (stringValue.equalsIgnoreCase(addFieldValue)) {
           allow = true;
         }
       }
-
     }
     return allow;
   }
 
-  public static void logStatForMetric(MetricCount metric, String prefixStr,
-                                      String postFix) {
-    long currStat = metric.count;
+  public static void logStatForMetric(MetricData metric, String prefixStr, String postFix) {
+    long currStat = metric.value;
     long currMS = System.currentTimeMillis();
-    if (currStat > metric.prevLogCount) {
-      if (postFix == null) {
-        postFix = "";
-      }
-      logger.info(prefixStr + ": total_count=" + metric.count
-        + ", duration=" + (currMS - metric.prevLogMS) / 1000
-        + " secs, count=" + (currStat - metric.prevLogCount)
-        + postFix);
+    if (currStat > metric.prevLogValue) {
+      LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
+          " secs, count=" + (currStat - metric.prevLogValue) + postFix);
     }
-    metric.prevLogCount = currStat;
-    metric.prevLogMS = currMS;
+    metric.prevLogValue = currStat;
+    metric.prevLogTime = currMS;
   }
 
   public static Map<String, Object> cloneObject(Map<String, Object> map) {
@@ -336,221 +256,74 @@ public class LogFeederUtil {
       return null;
     }
     String jsonStr = gson.toJson(map);
-    Type type = new TypeToken<Map<String, Object>>() {
-    }.getType();
+    Type type = new TypeToken<Map<String, Object>>() {}.getType();
     return gson.fromJson(jsonStr, type);
   }
 
   public static Map<String, Object> toJSONObject(String jsonStr) {
-    if(jsonStr==null || jsonStr.trim().isEmpty()){
+    if (StringUtils.isBlank(jsonStr)) {
       return new HashMap<String, Object>();
     }
-    Type type = new TypeToken<Map<String, Object>>() {
-    }.getType();
+    Type type = new TypeToken<Map<String, Object>>() {}.getType();
     return gson.fromJson(jsonStr, type);
   }
 
-  static public boolean logErrorMessageByInterval(String key, String message,
-                                                  Throwable e, Logger callerLogger, Level level) {
+  private static class LogHistory {
+    private long lastLogTime = 0;
+    private int counter = 0;
+  }
+
+  private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
 
+  public static boolean logErrorMessageByInterval(String key, String message, Throwable e, Logger callerLogger, Level level) {
     LogHistory log = logHistoryList.get(key);
     if (log == null) {
       log = new LogHistory();
       logHistoryList.put(key, log);
     }
-    if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+    
+    if ((System.currentTimeMillis() - log.lastLogTime) > 30 * 1000) {
       log.lastLogTime = System.currentTimeMillis();
-      int counter = log.counter;
-      log.counter = 0;
-      if (counter > 0) {
-        message += ". Messages suppressed before: " + counter;
-      }
-      if (e == null) {
-        callerLogger.log(level, message);
-      } else {
-        callerLogger.log(level, message, e);
+      if (log.counter > 0) {
+        message += ". Messages suppressed before: " + log.counter;
       }
+      log.counter = 0;
+      callerLogger.log(level, message, e);
 
       return true;
     } else {
       log.counter++;
-    }
-    return false;
-
-  }
-
-  static public String subString(String str, int maxLength) {
-    if (str == null || str.length() == 0) {
-      return "";
-    }
-    maxLength = str.length() < maxLength ? str.length() : maxLength;
-    return str.substring(0, maxLength);
-  }
-
-  public static long genHash(String value) {
-    if (value == null) {
-      value = "null";
-    }
-    return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
-  }
-
-  private static class LogHistory {
-    private long lastLogTime = 0;
-    private int counter = 0;
-  }
-
-  public static String getDate(String timeStampStr) {
-    try {
-      return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
-    } catch (Exception ex) {
-      logger.error(ex);
-      return null;
+      return false;
     }
   }
 
-  public static String getActualDateStr() {
-    try {
-      return dateFormatter.get().format(new Date());
-    } catch (Exception ex) {
-      logger.error(ex);
-      return null;
-    }
-  }
-
-  public static File getFileFromClasspath(String filename) {
-    URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
-      .getResource(filename);
-    logger.debug("File Complete URI :" + fileCompleteUrl);
-    File file = null;
-    try {
-      file = new File(fileCompleteUrl.toURI());
-    } catch (Exception exception) {
-      logger.debug(exception.getMessage(), exception.getCause());
-    }
-    return file;
-  }
-
-  public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
-    Object instance = null;
-    try {
-      instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
-    } catch (Exception exception) {
-      logger.error("Unsupported class =" + classFullName, exception.getCause());
+  public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+    if (list == null) {
+      return false;
     }
-    // check instance class as par aliasType
-    if (instance != null) {
-      boolean isValid = false;
-      switch (aliasType) {
-        case FILTER:
-          isValid = Filter.class.isAssignableFrom(instance.getClass());
-          break;
-        case INPUT:
-          isValid = Input.class.isAssignableFrom(instance.getClass());
-          break;
-        case OUTPUT:
-          isValid = Output.class.isAssignableFrom(instance.getClass());
-          break;
-        case MAPPER:
-          isValid = Mapper.class.isAssignableFrom(instance.getClass());
-          break;
-        default:
-          // by default consider all are valid class
-          isValid = true;
+    
+    for (String value : list) {
+      if (value == null) {
+        continue;
       }
-      if (!isValid) {
-        logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
-      }
-    }
-    return instance;
-  }
-
-  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
-      });
-      return jsonmap;
-    } catch (JsonParseException e) {
-      logger.error(e, e.getCause());
-    } catch (JsonMappingException e) {
-      logger.error(e, e.getCause());
-    } catch (IOException e) {
-      logger.error(e, e.getCause());
-    }
-    return new HashMap<String, Object>();
-  }
-
-  public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
-    if (list != null) {
-      for (String value : list) {
-        if (value != null) {
-          if (caseSensitive) {
-            if (value.equals(str)) {
-              return true;
-            }
-          } else {
-            if (value.equalsIgnoreCase(str)) {
-              return true;
-            }
-          }
-          if (value.equalsIgnoreCase(LogFeederConstants.ALL)) {
-            return true;
-          }
-        }
+      
+      if (caseSensitive ? value.equals(str) : value.equalsIgnoreCase(str) ||
+          value.equalsIgnoreCase(LogFeederConstants.ALL)) {
+        return true;
       }
     }
     return false;
   }
   
+  private static String logfeederTempDir = null;
   
-  private static synchronized String setHostNameAndIP() {
-    if (hostName == null || ipAddress == null) {
-      try {
-        InetAddress ip = InetAddress.getLocalHost();
-        ipAddress = ip.getHostAddress();
-        String getHostName = ip.getHostName();
-        String getCanonicalHostName = ip.getCanonicalHostName();
-        if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
-          logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
-          hostName = getCanonicalHostName;
-        } else {
-          logger.info("Using getHostName()=" + getHostName);
-          hostName = getHostName;
-        }
-        logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
-            + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
-            + hostName);
-      } catch (UnknownHostException e) {
-        logger.error("Error getting hostname.", e);
-      }
-    }
-    return hostName;
-  }
-
-  public static String[] mergeArray(String[] first, String[] second) {
-    if (first == null) {
-      first = new String[0];
-    }
-    if (second == null) {
-      second = new String[0];
-    }
-    String[] mergedArray = ObjectArrays.concat(first, second, String.class);
-    return mergedArray;
-  }
-  
-  public static String getLogfeederTempDir() {
+  public synchronized static String getLogfeederTempDir() {
     if (logfeederTempDir == null) {
-      synchronized (_LOCK) {
-        if (logfeederTempDir == null) {
-          String tempDirValue = getStringProperty("logfeeder.tmp.dir",
-              "/tmp/$username/logfeeder/");
-          HashMap<String, String> contextParam = new HashMap<String, String>();
-          String username = System.getProperty("user.name");
-          contextParam.put("username", username);
-          logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
-              contextParam);
-        }
-      }
+      String tempDirValue = getStringProperty("logfeeder.tmp.dir", "/tmp/$username/logfeeder/");
+      HashMap<String, String> contextParam = new HashMap<String, String>();
+      String username = System.getProperty("user.name");
+      contextParam.put("username", username);
+      logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, contextParam);
     }
     return logfeederTempDir;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
index fd96f8a..c975b99 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
@@ -25,71 +25,53 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
-public enum LogfeederHDFSUtil {
-  INSTANCE;
-  private static Logger logger = Logger.getLogger(LogfeederHDFSUtil.class);
+public class LogfeederHDFSUtil {
+  private static final Logger LOG = Logger.getLogger(LogfeederHDFSUtil.class);
 
-  public void createHDFSDir(String dirPath, FileSystem dfs) {
-    Path src = new Path(dirPath);
-    try {
-      if (dfs.isDirectory(src)) {
-        logger.info("hdfs dir dirPath=" + dirPath + "  is already exist.");
-        return;
-      }
-      boolean isDirCreated = dfs.mkdirs(src);
-      if (isDirCreated) {
-        logger.debug("HDFS dirPath=" + dirPath + " created successfully.");
-      } else {
-        logger.warn("HDFS dir creation failed dirPath=" + dirPath);
-      }
-    } catch (IOException e) {
-      logger.error("HDFS dir creation failed dirPath=" + dirPath, e.getCause());
-    }
+  private LogfeederHDFSUtil() {
+    throw new UnsupportedOperationException();
   }
-
-  public boolean copyFromLocal(String sourceFilepath, String destFilePath,
-      FileSystem fileSystem, boolean overwrite, boolean delSrc) {
+  
+  public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
+      boolean delSrc) {
     Path src = new Path(sourceFilepath);
     Path dst = new Path(destFilePath);
     boolean isCopied = false;
     try {
-      logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := "
-          + destFilePath);
+      LOG.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
       fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
       isCopied = true;
     } catch (Exception e) {
-      logger.error("Error copying local file :" + sourceFilepath
-          + " to hdfs location : " + destFilePath, e);
+      LOG.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e);
     }
     return isCopied;
   }
 
-  public FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+  public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
     try {
       Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
       FileSystem fs = FileSystem.get(configuration);
       return fs;
     } catch (Exception e) {
-      logger.error("Exception is buildFileSystem :", e);
+      LOG.error("Exception is buildFileSystem :", e);
     }
     return null;
   }
 
-  public void closeFileSystem(FileSystem fileSystem) {
+  private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
+    String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+    Configuration configuration = new Configuration();
+    configuration.set("fs.default.name", url);
+    return configuration;
+  }
+
+  public static void closeFileSystem(FileSystem fileSystem) {
     if (fileSystem != null) {
       try {
         fileSystem.close();
       } catch (IOException e) {
-        logger.error(e.getLocalizedMessage(), e.getCause());
+        LOG.error(e.getLocalizedMessage(), e.getCause());
       }
     }
   }
-
-  public Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
-    String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
-    Configuration configuration = new Configuration();
-    configuration.set("fs.default.name", url);
-    return configuration;
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
index d6c3117..13f2865 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,38 +21,34 @@ import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class PlaceholderUtil {
+import org.apache.commons.lang3.StringUtils;
 
-  private static Pattern placeHolderPattern;
-  static {
-    placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
+public class PlaceholderUtil {
+  private PlaceholderUtil() {
+    throw new UnsupportedOperationException();
   }
+  
+  private static final Pattern placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
 
-  public static String replaceVariables(String inputStr,
-      HashMap<String, String> contextParam) {
+  public static String replaceVariables(String inputStr, HashMap<String, String> contextParam) {
     Matcher m = placeHolderPattern.matcher(inputStr);
-    String placeholder;
-    String replacement;
     String output = new String(inputStr);
     while (m.find()) {
-      placeholder = m.group();
+      String placeholder = m.group();
       if (placeholder != null && !placeholder.isEmpty()) {
-        String key = placeholder.replace("$","").toLowerCase();// remove
-                                                                   // brace
-        replacement = getFromContext(contextParam, placeholder, key);
+        String key = placeholder.replace("$","").toLowerCase();// remove brace
+        String replacement = getFromContext(contextParam, placeholder, key);
         output = output.replace(placeholder, replacement);
       }
     }
     return output;
   }
 
-  private static String getFromContext(HashMap<String, String> contextParam,
-      String defaultValue, String key) {
-    String returnValue = defaultValue;// by default set default value as a
-                                      // return
+  private static String getFromContext(HashMap<String, String> contextParam, String defaultValue, String key) {
+    String returnValue = defaultValue; // by default set default value as a return
     if (contextParam != null) {
       String value = contextParam.get(key);
-      if (value != null && !value.trim().isEmpty()) {
+      if (StringUtils.isNotBlank(value)) {
         returnValue = value;
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
index 10ea2c2..31a38d0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
@@ -19,7 +19,6 @@
 package org.apache.ambari.logfeeder.util;
 
 import java.io.BufferedReader;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -27,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
@@ -39,22 +39,19 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
 
 /**
  * Utility to connect to s3
  */
 public class S3Util {
-  public static final S3Util INSTANCE = new S3Util();
-
   private static final Logger LOG = Logger.getLogger(S3Util.class);
 
-  public static final String S3_PATH_START_WITH = "s3://";
-  public static final String S3_PATH_SEPARATOR = "/";
-
-  public AmazonS3 getS3Client(String accessKey, String secretKey) {
-    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
-        accessKey, secretKey);
+  private S3Util() {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static AmazonS3 getS3Client(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.createAWSCredentials(accessKey, secretKey);
     AmazonS3 s3client;
     if (awsCredentials != null) {
       s3client = new AmazonS3Client(awsCredentials);
@@ -64,9 +61,8 @@ public class S3Util {
     return s3client;
   }
 
-  public TransferManager getTransferManager(String accessKey, String secretKey) {
-    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
-        accessKey, secretKey);
+  public static TransferManager getTransferManager(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.createAWSCredentials(accessKey, secretKey);
     TransferManager transferManager;
     if (awsCredentials != null) {
       transferManager = new TransferManager(awsCredentials);
@@ -76,35 +72,31 @@ public class S3Util {
     return transferManager;
   }
 
-  public void shutdownTransferManager(TransferManager transferManager) {
+  public static void shutdownTransferManager(TransferManager transferManager) {
     if (transferManager != null) {
       transferManager.shutdownNow();
     }
   }
 
-  public String getBucketName(String s3Path) {
+  public static String getBucketName(String s3Path) {
     String bucketName = null;
     // s3path
     if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
-          S3_PATH_SEPARATOR);
+      String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
       bucketName = s3PathParts[0];
     }
     return bucketName;
   }
 
-  public String getS3Key(String s3Path) {
+  public static String getS3Key(String s3Path) {
     StringBuilder s3Key = new StringBuilder();
-    // s3path
     if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
-          S3_PATH_SEPARATOR);
-      ArrayList<String> s3PathList = new ArrayList<String>(
-          Arrays.asList(s3PathParts));
+      String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
+      ArrayList<String> s3PathList = new ArrayList<String>(Arrays.asList(s3PathParts));
       s3PathList.remove(0);// remove bucketName
       for (int index = 0; index < s3PathList.size(); index++) {
         if (index > 0) {
-          s3Key.append(S3_PATH_SEPARATOR);
+          s3Key.append(LogFeederConstants.S3_PATH_SEPARATOR);
         }
         s3Key.append(s3PathList.get(index));
       }
@@ -112,63 +104,41 @@ public class S3Util {
     return s3Key.toString();
   }
 
-  public void uploadFileTos3(String bucketName, String s3Key, File localFile,
-      String accessKey, String secretKey) {
-    TransferManager transferManager = getTransferManager(accessKey, secretKey);
-    try {
-      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
-      upload.waitForUploadResult();
-    } catch (AmazonClientException | InterruptedException e) {
-      LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
-          e);
-    } finally {
-      shutdownTransferManager(transferManager);
-    }
-  }
-
   /**
    * Get the buffer reader to read s3 file as a stream
    */
-  public BufferedReader getReader(String s3Path, String accessKey,
-      String secretKey) throws IOException {
+  public static BufferedReader getReader(String s3Path, String accessKey, String secretKey) throws IOException {
     // TODO error handling
     // Compression support
     // read header and decide the compression(auto detection)
     // For now hard-code GZIP compression
     String s3Bucket = getBucketName(s3Path);
     String s3Key = getS3Key(s3Path);
-    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
-        new GetObjectRequest(s3Bucket, s3Key));
-    GZIPInputStream objectInputStream;
+    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(new GetObjectRequest(s3Bucket, s3Key));
     try {
-      objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
-      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
-          objectInputStream));
+      GZIPInputStream objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(objectInputStream));
       return bufferedReader;
     } catch (IOException e) {
-      LOG.error("Error in creating stream reader for s3 file :" + s3Path,
-          e.getCause());
+      LOG.error("Error in creating stream reader for s3 file :" + s3Path, e.getCause());
       throw e;
     }
   }
 
-  public void writeIntoS3File(String data, String bucketName, String s3Key,
-      String accessKey, String secretKey) {
+  public static void writeIntoS3File(String data, String bucketName, String s3Key, String accessKey, String secretKey) {
     InputStream in = null;
     try {
       in = IOUtils.toInputStream(data, "UTF-8");
     } catch (IOException e) {
       LOG.error(e);
     }
+    
     if (in != null) {
       TransferManager transferManager = getTransferManager(accessKey, secretKey);
       try {
         if (transferManager != null) {
-          transferManager.upload(
-                  new PutObjectRequest(bucketName, s3Key, in,
-                  new ObjectMetadata())).waitForUploadResult();
-          LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
-              + bucketName);
+          transferManager.upload(new PutObjectRequest(bucketName, s3Key, in, new ObjectMetadata())).waitForUploadResult();
+          LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + bucketName);
         }
       } catch (AmazonClientException | InterruptedException e) {
         LOG.error(e);
@@ -182,5 +152,4 @@ public class S3Util {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
deleted file mode 100644
index 44113e1..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.util;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.SolrRequest.METHOD;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-
-public class SolrUtil {
-
-  private static final Logger logger = Logger.getLogger(SolrUtil.class);
-
-  private static SolrUtil instance = null;
-  
-  private SolrClient solrClient = null;
-  private CloudSolrClient solrClouldClient = null;
-
-  private String solrDetail = "";
-
-  private SolrUtil() throws Exception {
-    String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
-    String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
-    String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
-    connectToSolr(url, zkConnectString, collection);
-  }
-
-  public static SolrUtil getInstance() {
-    if (instance == null) {
-      synchronized (SolrUtil.class) {
-        if (instance == null) {
-          try {
-            instance = new SolrUtil();
-          } catch (Exception e) {
-            final String LOG_MESSAGE_KEY = SolrUtil.class
-                .getSimpleName() + "_SOLR_UTIL";
-              LogFeederUtil.logErrorMessageByInterval(
-                LOG_MESSAGE_KEY,
-                "Error constructing solrUtil", e, logger,
-                Level.WARN);
-          }
-        }
-      }
-    }
-    return instance;
-  }
-
-  private SolrClient connectToSolr(String url, String zkConnectString,
-                                  String collection) throws Exception {
-    solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection
-      + ", url=" + url;
-
-    logger.info("connectToSolr() " + solrDetail);
-    if (collection == null || collection.isEmpty()) {
-      throw new Exception("For solr, collection name is mandatory. "
-        + solrDetail);
-    }
-    if (zkConnectString != null && !zkConnectString.isEmpty()) {
-      solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection;
-      logger.info("Using zookeepr. " + solrDetail);
-      solrClouldClient = new CloudSolrClient(zkConnectString);
-      solrClouldClient.setDefaultCollection(collection);
-      solrClient = solrClouldClient;
-      int waitDurationMS = 3 * 60 * 1000;
-      checkSolrStatus(waitDurationMS);
-    } else {
-      if (url == null || url.trim().isEmpty()) {
-        throw new Exception("Both zkConnectString and URL are empty. zkConnectString="
-          + zkConnectString + ", collection=" + collection + ", url="
-          + url);
-      }
-      solrDetail = "collection=" + collection + ", url=" + url;
-      String collectionURL = url + "/" + collection;
-      logger.info("Connecting to  solr : " + collectionURL);
-      solrClient = new HttpSolrClient(collectionURL);
-
-    }
-    return solrClient;
-  }
-
-  private boolean checkSolrStatus(int waitDurationMS) {
-    boolean status = false;
-    try {
-      long beginTimeMS = System.currentTimeMillis();
-      long waitIntervalMS = 2000;
-      int pingCount = 0;
-      while (true) {
-        pingCount++;
-        CollectionAdminResponse response = null;
-        try {
-          CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
-          response = colListReq.process(solrClient);
-        } catch (Exception ex) {
-          logger.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
-        }
-        if (response != null && response.getStatus() == 0) {
-          logger.info("Solr getCollections() is success. solr=" + solrDetail);
-          status = true;
-          break;
-        }
-        if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
-          logger.error("Solr is not reachable even after "
-            + (System.currentTimeMillis() - beginTimeMS)
-            + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
-            + solrDetail + ", response=" + response);
-          break;
-        } else {
-          logger.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount
-            + ". Will sleep for " + waitIntervalMS + " ms and try again." + " solr=" + solrDetail
-            + ", response=" + response);
-
-        }
-        Thread.sleep(waitIntervalMS);
-      }
-    } catch (Throwable t) {
-      logger.error("Seems Solr is not up. solrDetail=" + solrDetail);
-    }
-    return status;
-  }
-
-  private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
-    if (solrClient != null) {
-      QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
-      return queryResponse;
-    } else {
-      logger.error("solrClient can't be null");
-      return null;
-    }
-  }
-
-  public HashMap<String, Object> getConfigDoc() {
-    HashMap<String, Object> configMap = new HashMap<String, Object>();
-    SolrQuery solrQuery = new SolrQuery();
-    solrQuery.setQuery("*:*");
-    String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
-    solrQuery.setFilterQueries(fq);
-    try {
-      QueryResponse response = process(solrQuery);
-      if (response != null) {
-        SolrDocumentList documentList = response.getResults();
-        if (documentList != null && documentList.size() > 0) {
-          SolrDocument configDoc = documentList.get(0);
-          String configJson = LogFeederUtil.getGson().toJson(configDoc);
-          configMap = (HashMap<String, Object>) LogFeederUtil
-              .toJSONObject(configJson);
-        }
-      }
-    } catch (Exception e) {
-      final String logMessageKey = this.getClass().getSimpleName()
-          + "_FETCH_FILTER_CONFIG_ERROR";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey,
-          "Error getting filter config from solr", e, logger, Level.ERROR);
-    }
-    return configMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
deleted file mode 100644
index f030040..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.view;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VLogfeederFilter {
-
-  private String label;
-  private List<String> hosts;
-  private List<String> defaultLevels;
-  private List<String> overrideLevels;
-  private String expiryTime;
-
-  public VLogfeederFilter() {
-    hosts = new ArrayList<String>();
-    defaultLevels = new ArrayList<String>();
-    overrideLevels = new ArrayList<String>();
-  }
-
-  public String getLabel() {
-    return label;
-  }
-
-  public void setLabel(String label) {
-    this.label = label;
-  }
-
-  public List<String> getHosts() {
-    return hosts;
-  }
-
-  public void setHosts(List<String> hosts) {
-    this.hosts = hosts;
-  }
-
-  public List<String> getDefaultLevels() {
-    return defaultLevels;
-  }
-
-  public void setDefaultLevels(List<String> defaultLevels) {
-    this.defaultLevels = defaultLevels;
-  }
-
-  public List<String> getOverrideLevels() {
-    return overrideLevels;
-  }
-
-  public void setOverrideLevels(List<String> overrideLevels) {
-    this.overrideLevels = overrideLevels;
-  }
-
-  public String getExpiryTime() {
-    return expiryTime;
-  }
-
-  public void setExpiryTime(String expiryTime) {
-    this.expiryTime = expiryTime;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
deleted file mode 100644
index 4ddef3f..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.view;
-
-import java.util.HashMap;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VLogfeederFilterWrapper {
-
-  private HashMap<String, VLogfeederFilter> filter;
-  private String id;
-
-  public HashMap<String, VLogfeederFilter> getFilter() {
-    return filter;
-  }
-
-  public void setFilter(HashMap<String, VLogfeederFilter> filter) {
-    this.filter = filter;
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
deleted file mode 100644
index 193cb48..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.filter.FilterGrok;
-import org.apache.log4j.Logger;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest extends TestCase {
-  static Logger logger = Logger.getLogger(AppTest.class);
-
-  /**
-   * Create the test case
-   *
-   * @param testName name of the test case
-   */
-  public AppTest(String testName) {
-    super(testName);
-  }
-
-  /**
-   * @return the suite of tests being tested
-   */
-  public static Test suite() {
-    return new TestSuite(AppTest.class);
-  }
-
-  /**
-   * Rigourous Test :-)
-   */
-  public void testApp() {
-    assertTrue(true);
-  }
-
-  public void testGrok() {
-    logger.info("testGrok()");
-    FilterGrok grokFilter = new FilterGrok();
-    try {
-      Map<String, Object> map = new HashMap<String, Object>();
-      map.put("message_pattern",
-        "^%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
-      grokFilter.loadConfig(map);
-      grokFilter.init();
-      String out = grokFilter.grokParse("INFO This is a test");
-      logger.info("out=" + out);
-
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-      assertFalse(true);
-    }
-
-    assertTrue(true);
-  }
-
-  public void testGrokUGI() {
-    logger.info("testGrok()");
-    String[] ugis = new String[]{"user1@xyz.com (auth:TOKEN)",
-      "ambari-qa@example.com (auth:kerberos)",
-      "my_user@example.com (auth:kerberos)",
-      "hive/bdurai-dojran-2.novalocal@example.com (auth:kerberos)",
-      "just_me",
-      "ambari-qa (auth:PROXY) via hive/myhost.novalocal@EXAMPLE.COM (auth:KERBEROS)"};
-
-    FilterGrok grokFilter = new FilterGrok();
-    try {
-      Map<String, Object> map = new HashMap<String, Object>();
-      // map.put("message_pattern",
-      // "(?<user>([\\w\\d\\-]+))\\/|(?<user>([\\w\\d\\-]+))@|(?<user>([\\w\\d\\-]+))/[\\w\\d\\-.]+@|(?<user>([\\w\\d.\\-_]+))[\\s(]+");
-      // map.put("message_pattern",
-      // "(?<user>([\\w\\d\\-]+))/[\\w\\d\\-.]+@");
-      // *(auth:(?<auth>[\\w\\d\\-]+))
-      // GOOD: map.put("message_pattern", "(?<user>([\\w\\d\\-]+)).+auth:(?<auth>([\\w\\d\\-]+))");
-      // OK: map.put("message_pattern", "(?<user>([\\w\\d\\-]+)).+auth:(?<auth>([\\w\\d\\-]+))|%{USERNAME:xuser}");
-      //map.put("message_pattern", "%{USERNAME:user}.+auth:%{USERNAME:authType}|%{USERNAME:x_user}");
-      map.put("message_pattern", "%{USERNAME:p_user}.+auth:%{USERNAME:p_authType}.+via %{USERNAME:k_user}.+auth:%{USERNAME:k_authType}|%{USERNAME:user}.+auth:%{USERNAME:authType}|%{USERNAME:x_user}");
-      grokFilter.loadConfig(map);
-      grokFilter.init();
-      for (String ugi : ugis) {
-        String out = grokFilter.grokParse(ugi);
-        logger.info(ugi + "=" + out);
-      }
-
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-      assertFalse(true);
-    }
-    assertTrue(true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index 3aa8d7b..99565c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,7 +23,7 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -40,16 +40,16 @@ public class FilterGrokTest {
   private static final Logger LOG = Logger.getLogger(FilterGrokTest.class);
 
   private FilterGrok filterGrok;
-  private OutputMgr mockOutputMgr;
+  private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
   public void init(Map<String, Object> config) throws Exception {
-    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterGrok = new FilterGrok();
     filterGrok.loadConfig(config);
-    filterGrok.setOutputMgr(mockOutputMgr);
+    filterGrok.setOutputManager(mockOutputManager);
     filterGrok.setInput(EasyMock.mock(Input.class));
     filterGrok.init();
   }
@@ -59,19 +59,18 @@ public class FilterGrokTest {
     LOG.info("testFilterGrok_parseMessage()");
 
     Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern",
-        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
     config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
     init(config);
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
-    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertNotNull(jsonParams);
@@ -86,23 +85,22 @@ public class FilterGrokTest {
     LOG.info("testFilterGrok_parseMultiLineMessage()");
 
     Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern",
-        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
     config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
     init(config);
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
     String multiLineMessage = "This is a multiline test message\r\n" + "having multiple lines\r\n"
         + "as one may expect";
     String[] messageLines = multiLineMessage.split("\r\n");
     for (int i = 0; i < messageLines.length; i++)
-      filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker());
+      filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker(null, null, 0));
     filterGrok.flush();
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertNotNull(jsonParams);
@@ -117,19 +115,18 @@ public class FilterGrokTest {
     LOG.info("testFilterGrok_notMatchingMesagePattern()");
 
     Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern",
-        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
     config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
     init(config);
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall().anyTimes();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker());
-    filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker());
+    filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+    filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured!", capture.hasCaptured());
   }
 
@@ -141,12 +138,12 @@ public class FilterGrokTest {
     config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
     init(config);
 
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
-    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured", capture.hasCaptured());
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index 64e9b69..06d8db2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,8 +27,7 @@ import java.util.TimeZone;
 
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -44,16 +43,16 @@ public class FilterJSONTest {
   private static final Logger LOG = Logger.getLogger(FilterJSONTest.class);
 
   private FilterJSON filterJson;
-  private OutputMgr mockOutputMgr;
+  private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
   public void init(Map<String, Object> params) throws Exception {
-    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterJson = new FilterJSON();
     filterJson.loadConfig(params);
-    filterJson.setOutputMgr(mockOutputMgr);
+    filterJson.setOutputManager(mockOutputManager);
     filterJson.init();
   }
 
@@ -63,17 +62,17 @@ public class FilterJSONTest {
 
     init(new HashMap<String, Object>());
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
     Date d = new Date();
-    DateFormat sdf = new SimpleDateFormat(LogFeederUtil.SOLR_DATE_FORMAT);
+    DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
     sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
     String dateString = sdf.format(d);
-    filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker());
+    filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
@@ -87,17 +86,17 @@ public class FilterJSONTest {
 
     init(new HashMap<String, Object>());
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
     Date d = new Date();
-    DateFormat sdf = new SimpleDateFormat(LogFeederUtil.SOLR_DATE_FORMAT);
+    DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
     sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
     String dateString = sdf.format(d);
-    filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker());
+    filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
@@ -111,13 +110,13 @@ public class FilterJSONTest {
 
     init(new HashMap<String, Object>());
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker());
+    filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
@@ -132,7 +131,7 @@ public class FilterJSONTest {
     init(new HashMap<String, Object>());
     String inputStr="invalid json";
     try{
-    filterJson.apply(inputStr,new InputMarker());
+    filterJson.apply(inputStr,new InputMarker(null, null, 0));
     fail("Expected LogfeederException was not occured");
     }catch(LogfeederException logfeederException){
       assertEquals("Json parsing failed for inputstr = "+inputStr, logfeederException.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 849e4c3..30cee42 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -38,16 +38,16 @@ public class FilterKeyValueTest {
   private static final Logger LOG = Logger.getLogger(FilterKeyValueTest.class);
 
   private FilterKeyValue filterKeyValue;
-  private OutputMgr mockOutputMgr;
+  private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
   public void init(Map<String, Object> config) throws Exception {
-    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterKeyValue = new FilterKeyValue();
     filterKeyValue.loadConfig(config);
-    filterKeyValue.setOutputMgr(mockOutputMgr);
+    filterKeyValue.setOutputManager(mockOutputManager);
     filterKeyValue.init();
   }
 
@@ -61,13 +61,13 @@ public class FilterKeyValueTest {
     // using default value split:
     init(config);
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("keyValueField"));
@@ -85,13 +85,13 @@ public class FilterKeyValueTest {
     // using default value split: =
     init(config);
 
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall().anyTimes();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     assertFalse("Something was captured!", capture.hasCaptured());
   }
 
@@ -105,13 +105,13 @@ public class FilterKeyValueTest {
     init(config);
 
     // using default value split: =
-    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall().anyTimes();
-    EasyMock.replay(mockOutputMgr);
+    EasyMock.replay(mockOutputManager);
 
-    filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker());
+    filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
 
-    EasyMock.verify(mockOutputMgr);
+    EasyMock.verify(mockOutputManager);
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("otherField"));

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 42e81da..08aa564 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -104,7 +104,7 @@ public class InputFileTest {
 
     inputFile = new InputFile();
     inputFile.loadConfig(config);
-    inputFile.setFirstFilter(capture);
+    inputFile.addFilter(capture);
     inputFile.init();
   }
 
@@ -117,10 +117,10 @@ public class InputFileTest {
 
     init(testFile.getAbsolutePath());
 
-    InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
-    EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir);
-    EasyMock.replay(inputMgr);
-    inputFile.setInputMgr(inputMgr);
+    InputManager inputManager = EasyMock.createStrictMock(InputManager.class);
+    EasyMock.expect(inputManager.getCheckPointFolderFile()).andReturn(checkPointDir);
+    EasyMock.replay(inputManager);
+    inputFile.setInputManager(inputManager);
 
     inputFile.isReady();
     inputFile.start();
@@ -129,7 +129,7 @@ public class InputFileTest {
     for (int row = 0; row < 3; row++)
       assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
 
-    EasyMock.verify(inputMgr);
+    EasyMock.verify(inputManager);
   }
 
   @Test
@@ -140,10 +140,10 @@ public class InputFileTest {
     File testFile = createFile("process6.log");
     init(testFile.getAbsolutePath());
 
-    InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
-    EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
-    EasyMock.replay(inputMgr);
-    inputFile.setInputMgr(inputMgr);
+    InputManager inputMabager = EasyMock.createStrictMock(InputManager.class);
+    EasyMock.expect(inputMabager.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
+    EasyMock.replay(inputMabager);
+    inputFile.setInputManager(inputMabager);
 
     inputFile.isReady();
     inputFile.start();
@@ -155,7 +155,7 @@ public class InputFileTest {
     for (int row = 0; row < 6; row++)
       assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
 
-    EasyMock.verify(inputMgr);
+    EasyMock.verify(inputMabager);
   }
 
   @Test