You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/11 18:15:03 UTC

[49/51] [partial] ambari git commit: AMBARI-15679. Initial commit for LogSearch module (oleewre)

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
new file mode 100644
index 0000000..445c294
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static java.nio.file.StandardWatchEventKinds.*;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputMgr {
+  static Logger logger = Logger.getLogger(InputMgr.class);
+
+  List<Input> inputList = new ArrayList<Input>();
+  Set<Input> notReadyList = new HashSet<Input>();
+
+  WatchService folderWatcher = null;
+  Set<File> foldersToMonitor = new HashSet<File>();
+  Map<String, Input> filesToMonitor = new HashMap<String, Input>();
+  boolean isDrain = false;
+  boolean isAnyInputTail = false;
+
+  private String checkPointSubFolderName = "logfeeder_checkpoints";
+  File checkPointFolderFile = null;
+
+  MetricCount filesCountMetric = new MetricCount();
+
+  private String checkPointExtension = ".cp";
+
+  public List<Input> getInputList() {
+    return inputList;
+  }
+
+  public void add(Input input) {
+    inputList.add(input);
+  }
+
+  /**
+   * @param input
+   */
+  public void removeInput(Input input) {
+    logger.info("Trying to remove from inputList. "
+      + input.getShortDescription());
+    Iterator<Input> iter = inputList.iterator();
+    while (iter.hasNext()) {
+      Input iterInput = iter.next();
+      if (iterInput.equals(input)) {
+        logger.info("Removing Input from inputList. "
+          + input.getShortDescription());
+        iter.remove();
+      }
+    }
+  }
+
+  /**
+   * @return
+   */
+  public int getActiveFilesCount() {
+    int count = 0;
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void init() {
+    filesCountMetric.metricsName = "input.files.count";
+    filesCountMetric.isPointInTime = true;
+
+    checkPointExtension = LogFeederUtil.getStringProperty(
+      "logfeeder.checkpoint.extension", checkPointExtension);
+    for (Input input : inputList) {
+      try {
+        input.init();
+        if (input.isTail()) {
+          isAnyInputTail = true;
+        }
+      } catch (Exception e) {
+        logger.error(
+          "Error initializing input. "
+            + input.getShortDescription(), e);
+      }
+    }
+
+    if (isAnyInputTail) {
+      logger.info("Determining valid checkpoint folder");
+      boolean isCheckPointFolderValid = false;
+      // We need to keep track of the files we are reading.
+      String checkPointFolder = LogFeederUtil
+        .getStringProperty("logfeeder.checkpoint.folder");
+      if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
+        checkPointFolderFile = new File(checkPointFolder);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's try home folder
+        String userHome = LogFeederUtil.getStringProperty("user.home");
+        if (userHome != null) {
+          checkPointFolderFile = new File(userHome,
+            checkPointSubFolderName);
+          logger.info("Checking if home folder can be used for checkpoints. Folder="
+            + checkPointFolderFile);
+          isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        }
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's use tmp folder
+        String tmpFolder = LogFeederUtil
+          .getStringProperty("java.io.tmpdir");
+        if (tmpFolder == null) {
+          tmpFolder = "/tmp";
+        }
+        checkPointFolderFile = new File(tmpFolder,
+          checkPointSubFolderName);
+        logger.info("Checking if tmps folder can be used for checkpoints. Folder="
+          + checkPointFolderFile);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        if (isCheckPointFolderValid) {
+          logger.warn("Using tmp folder "
+            + checkPointFolderFile
+            + " to store check points. This is not recommended."
+            + "Please set logfeeder.checkpoint.folder property");
+        }
+      }
+
+      if (isCheckPointFolderValid) {
+        logger.warn("Using folder " + checkPointFolderFile
+          + " for storing checkpoints");
+      }
+    }
+
+  }
+
+  public File getCheckPointFolderFile() {
+    return checkPointFolderFile;
+  }
+
+  boolean verifyCheckPointFolder(File folderPathFile) {
+    if (!folderPathFile.exists()) {
+      // Create the folder
+      try {
+        if (!folderPathFile.mkdir()) {
+          logger.warn("Error creating folder for check point. folder="
+            + folderPathFile);
+        }
+      } catch (Throwable t) {
+        logger.warn("Error creating folder for check point. folder="
+          + folderPathFile, t);
+      }
+    }
+
+    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+      // Let's check whether we can create a file
+      File testFile = new File(folderPathFile, UUID.randomUUID()
+        .toString());
+      try {
+        testFile.createNewFile();
+        return testFile.delete();
+      } catch (IOException e) {
+        logger.warn(
+          "Couldn't create test file in "
+            + folderPathFile.getAbsolutePath()
+            + " for checkPoint", e);
+      }
+    }
+    return false;
+  }
+
+  public void monitor() {
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        input.monitor();
+      } else {
+        if (input.isTail()) {
+          logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
+            + input.getShortDescription());
+          notReadyList.add(input);
+        } else {
+          logger.info("Input is not ready, so going to ignore it "
+            + input.getShortDescription());
+        }
+      }
+    }
+    // Start the monitoring thread if any file is in tail mode
+    if (isAnyInputTail) {
+      Thread monitorThread = new Thread("InputIsReadyMonitor") {
+        @Override
+        public void run() {
+          logger.info("Going to monitor for these missing files: "
+            + notReadyList.toString());
+          while (true) {
+            if (isDrain) {
+              logger.info("Exiting missing file monitor.");
+              break;
+            }
+            try {
+              Iterator<Input> iter = notReadyList.iterator();
+              while (iter.hasNext()) {
+                Input input = iter.next();
+                try {
+                  if (input.isReady()) {
+                    input.monitor();
+                    iter.remove();
+                  }
+                } catch (Throwable t) {
+                  logger.error("Error while enabling monitoring for input. "
+                    + input.getShortDescription());
+                }
+              }
+              Thread.sleep(30 * 1000);
+            } catch (Throwable t) {
+              // Ignore
+            }
+          }
+        }
+      };
+      monitorThread.start();
+    }
+  }
+
+  public void addToNotReady(Input notReadyInput) {
+    notReadyList.add(notReadyInput);
+  }
+
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    for (Input input : inputList) {
+      input.addMetricsContainers(metricsList);
+    }
+    filesCountMetric.count = getActiveFilesCount();
+    metricsList.add(filesCountMetric);
+  }
+
+  /**
+   *
+   */
+  public void logStats() {
+    for (Input input : inputList) {
+      input.logStat();
+    }
+
+    filesCountMetric.count = getActiveFilesCount();
+    LogFeederUtil.logStatForMetric(filesCountMetric,
+      "Stat: Files Monitored Count", null);
+  }
+
+  public void close() {
+    for (Input input : inputList) {
+      try {
+        input.setDrain(true);
+      } catch (Throwable t) {
+        logger.error(
+          "Error while draining. input="
+            + input.getShortDescription(), t);
+      }
+    }
+    isDrain = true;
+
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    int i = 0;
+    boolean allClosed = true;
+    for (i = 0; i < iterations; i++) {
+      allClosed = true;
+      for (Input input : inputList) {
+        if (!input.isClosed()) {
+          try {
+            allClosed = false;
+            logger.warn("Waiting for input to close. "
+              + input.getShortDescription() + ", "
+              + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        break;
+      }
+    }
+    if (!allClosed) {
+      logger.warn("Some inputs were not closed. Iterations=" + i);
+      for (Input input : inputList) {
+        if (!input.isClosed()) {
+          logger.warn("Input not closed. Will ignore it."
+            + input.getShortDescription());
+        }
+      }
+    } else {
+      logger.info("All inputs are closed. Iterations=" + i);
+    }
+
+  }
+
+  public void checkInAll() {
+    for (Input input : inputList) {
+      input.checkIn();
+    }
+  }
+
+  public void cleanCheckPointFiles() {
+
+    if (checkPointFolderFile == null) {
+      logger.info("Will not clean checkPoint files. checkPointFolderFile="
+        + checkPointFolderFile);
+      return;
+    }
+    logger.info("Cleaning checkPoint files. checkPointFolderFile="
+      + checkPointFolderFile.getAbsolutePath());
+    try {
+      // Loop over the check point files and if filePath is not present,
+      // then
+      // move to closed
+      String searchPath = "*" + checkPointExtension;
+      FileFilter fileFilter = new WildcardFileFilter(searchPath);
+      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+      int totalCheckFilesDeleted = 0;
+      for (File checkPointFile : checkPointFiles) {
+        RandomAccessFile checkPointReader = null;
+        try {
+          checkPointReader = new RandomAccessFile(checkPointFile, "r");
+
+          int contentSize = checkPointReader.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointReader.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+              + contentSize
+              + ", read="
+              + readSize
+              + ", checkPointFile=" + checkPointFile);
+          } else {
+            // Create JSON string
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            Map<String, Object> jsonCheckPoint = LogFeederUtil
+              .toJSONObject(jsonCheckPointStr);
+
+            String logFilePath = (String) jsonCheckPoint
+              .get("file_path");
+            String logFileKey = (String) jsonCheckPoint
+              .get("file_key");
+            if (logFilePath != null && logFileKey != null) {
+              boolean deleteCheckPointFile = false;
+              File logFile = new File(logFilePath);
+              if (logFile.exists()) {
+                Object fileKeyObj = InputFile
+                  .getFileKey(logFile);
+                String fileBase64 = Base64
+                  .byteArrayToBase64(fileKeyObj
+                    .toString().getBytes());
+                if (!logFileKey.equals(fileBase64)) {
+                  deleteCheckPointFile = true;
+                  logger.info("CheckPoint clean: File key has changed. old="
+                    + logFileKey
+                    + ", new="
+                    + fileBase64
+                    + ", filePath="
+                    + logFilePath
+                    + ", checkPointFile="
+                    + checkPointFile.getAbsolutePath());
+                }
+              } else {
+                logger.info("CheckPoint clean: Log file doesn't exist. filePath="
+                  + logFilePath
+                  + ", checkPointFile="
+                  + checkPointFile.getAbsolutePath());
+                deleteCheckPointFile = true;
+              }
+              if (deleteCheckPointFile) {
+                logger.info("Deleting CheckPoint file="
+                  + checkPointFile.getAbsolutePath()
+                  + ", logFile=" + logFilePath);
+                checkPointFile.delete();
+                totalCheckFilesDeleted++;
+              }
+            }
+          }
+        } catch (EOFException eof) {
+          logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
+            + checkPointFile);
+        } catch (Throwable t) {
+          logger.error("Error while checking checkPoint file. "
+            + checkPointFile, t);
+        } finally {
+          if (checkPointReader != null) {
+            try {
+              checkPointReader.close();
+            } catch (Throwable t) {
+              logger.error("Error closing checkPoint file. "
+                + checkPointFile, t);
+            }
+          }
+        }
+      }
+      logger.info("Deleted " + totalCheckFilesDeleted
+        + " checkPoint file(s). checkPointFolderFile="
+        + checkPointFolderFile.getAbsolutePath());
+
+    } catch (Throwable t) {
+      logger.error("Error while cleaning checkPointFiles", t);
+    }
+  }
+
+  synchronized public void monitorSystemFileChanges(Input inputToMonitor) {
+    try {
+      File fileToMonitor = new File(inputToMonitor.getFilePath());
+      if (filesToMonitor.containsKey(fileToMonitor.getAbsolutePath())) {
+        logger.info("Already monitoring file " + fileToMonitor
+          + ". So ignoring this request");
+        return;
+      }
+
+      // make a new watch service that we can register interest in
+      // directories and files with.
+      if (folderWatcher == null) {
+        folderWatcher = FileSystems.getDefault().newWatchService();
+        // start the file watcher thread below
+        Thread th = new Thread(new FileSystemMonitor(),
+          "FileSystemWatcher");
+        th.setDaemon(true);
+        th.start();
+
+      }
+      File folderToWatch = fileToMonitor.getParentFile();
+      if (folderToWatch != null) {
+        if (foldersToMonitor.contains(folderToWatch.getAbsolutePath())) {
+          logger.info("Already monitoring folder " + folderToWatch
+            + ". So ignoring this request.");
+        } else {
+          logger.info("Configuring to monitor folder "
+            + folderToWatch + " for file " + fileToMonitor);
+          // get the directory we want to watch, using the Paths
+          // singleton
+          // class
+          Path toWatch = Paths.get(folderToWatch.getAbsolutePath());
+          if (toWatch == null) {
+            throw new UnsupportedOperationException(
+              "Directory not found. folder=" + folderToWatch);
+          }
+
+          toWatch.register(folderWatcher, ENTRY_CREATE);
+          foldersToMonitor.add(folderToWatch);
+        }
+        filesToMonitor.put(fileToMonitor.getAbsolutePath(),
+          inputToMonitor);
+      } else {
+        logger.error("File doesn't have parent folder." + fileToMonitor);
+      }
+    } catch (IOException e) {
+      logger.error("Error while trying to set watcher for file:"
+        + inputToMonitor);
+    }
+
+  }
+
+  class FileSystemMonitor implements Runnable {
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+      try {
+        // get the first event before looping
+        WatchKey key = folderWatcher.take();
+        while (key != null) {
+          Path dir = (Path) key.watchable();
+          // we have a polled event, now we traverse it and
+          // receive all the states from it
+          for (WatchEvent<?> event : key.pollEvents()) {
+            if (!event.kind().equals(ENTRY_CREATE)) {
+              logger.info("Ignoring event.kind=" + event.kind());
+              continue;
+            }
+            logger.info("Received " + event.kind()
+              + " event for file " + event.context());
+
+            File newFile = new File(dir.toFile(), event.context()
+              .toString());
+            Input rolledOverInput = filesToMonitor.get(newFile
+              .getAbsolutePath());
+            if (rolledOverInput == null) {
+              logger.info("Input not found for file " + newFile);
+            } else {
+              rolledOverInput.rollOver();
+            }
+          }
+          if (!key.reset()) {
+            logger.error("Error while key.reset(). Will have to abort watching files. Rollover will not work.");
+            break;
+          }
+          key = folderWatcher.take();
+        }
+      } catch (InterruptedException e) {
+        logger.info("Stop request for thread");
+      }
+      logger.info("Exiting FileSystemMonitor thread.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
new file mode 100644
index 0000000..f4ca51b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.gson.reflect.TypeToken;
+
+public class LogFeeder {
+  static Logger logger = Logger.getLogger(LogFeeder.class);
+
+  // List<Input> inputList = new ArrayList<Input>();
+  Collection<Output> outputList = new ArrayList<Output>();
+
+  OutputMgr outMgr = new OutputMgr();
+  InputMgr inputMgr = new InputMgr();
+  MetricsMgr metricsMgr = new MetricsMgr();
+
+  Map<String, Object> globalMap = null;
+  String[] inputParams;
+
+  List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
+  List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+  List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
+  List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
+
+  int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
+  long lastCheckPointCleanedMS = 0;
+
+  public LogFeeder(String[] args) {
+    inputParams = args;
+  }
+
+  public void init() throws Throwable {
+
+    // Load properties
+    LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
+
+    // loop the properties and load them
+    // Load the configs
+    String configFiles = LogFeederUtil.getStringProperty("config.files");
+    if (configFiles == null) {
+      configFiles = LogFeederUtil.getStringProperty("config.file",
+        "config.json");
+    }
+    logger.info("config.files=" + configFiles);
+    String[] configFileList = configFiles.split(",");
+    for (String configFileName : configFileList) {
+      logger.info("Going to load config file:" + configFileName);
+      File configFile = new File(configFileName);
+      if (configFile.exists() && configFile.isFile()) {
+        logger.info("Config file exists in path."
+          + configFile.getAbsolutePath());
+        loadConfigsUsingFile(configFile);
+      } else {
+        // Let's try to load it from class loader
+        logger.info("Trying to load config file from classloader: "
+          + configFileName);
+        laodConfigsUsingClassLoader(configFileName);
+        logger.info("Loaded config file from classloader: "
+          + configFileName);
+      }
+    }
+    mergeAllConfigs();
+    outMgr.setOutputList(outputList);
+    for (Output output : outputList) {
+      output.init();
+    }
+    inputMgr.init();
+    metricsMgr.init();
+    //starting timer to fetch config from solr 
+    LogfeederScheduler.INSTANCE.start();
+    logger.debug("==============");
+  }
+
+  void laodConfigsUsingClassLoader(String configFileName) throws Exception {
+    BufferedInputStream fileInputStream = (BufferedInputStream) this
+      .getClass().getClassLoader()
+      .getResourceAsStream(configFileName);
+    if (fileInputStream != null) {
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+        fileInputStream));
+      String configData = readFile(br);
+      loadConfigs(configData);
+    } else {
+      throw new Exception("Can't find configFile=" + configFileName);
+    }
+  }
+
+  /**
+   * This method loads the configurations from the given file.
+   *
+   * @param configFile
+   * @return
+   * @throws Exception
+   */
+  void loadConfigsUsingFile(File configFile) throws Exception {
+    FileInputStream fileInputStream = null;
+    try {
+      fileInputStream = new FileInputStream(configFile);
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+        fileInputStream));
+      String configData = readFile(br);
+      loadConfigs(configData);
+    } catch (Exception t) {
+      logger.error("Error opening config file. configFilePath="
+        + configFile.getAbsolutePath());
+      throw t;
+    } finally {
+      if (fileInputStream != null) {
+        try {
+          fileInputStream.close();
+        } catch (Throwable t) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  void loadConfigs(String configData) throws Exception {
+    Type type = new TypeToken<Map<String, Object>>() {
+    }.getType();
+    Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(
+      configData, type);
+
+    // Get the globals
+    for (String key : configMap.keySet()) {
+      if (key.equalsIgnoreCase("global")) {
+        globalConfigList.add((Map<String, Object>) configMap.get(key));
+      } else if (key.equalsIgnoreCase("input")) {
+        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+          .get(key);
+        inputConfigList.addAll(mapList);
+      } else if (key.equalsIgnoreCase("filter")) {
+        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+          .get(key);
+        filterConfigList.addAll(mapList);
+      } else if (key.equalsIgnoreCase("output")) {
+        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+          .get(key);
+        outputConfigList.addAll(mapList);
+      }
+    }
+
+  }
+
+  /**
+   *
+   */
+  private void mergeAllConfigs() {
+    globalMap = mergeConfigs(globalConfigList);
+
+    // Sort the filter blocks
+    sortBlocks(filterConfigList);
+    // First loop for output
+    for (Map<String, Object> map : outputConfigList) {
+      if (map == null) {
+        continue;
+      }
+      mergeBlocks(globalMap, map);
+
+      String value = (String) map.get("destination");
+      Output output;
+      if (value == null || value.isEmpty()) {
+        logger.error("Output block doesn't have destination element");
+        continue;
+      }
+      String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS);
+      if (classFullName == null || classFullName.isEmpty()) {
+        logger.error("Destination block doesn't have output element");
+        continue;
+      }
+      output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT);
+
+      if (output == null) {
+        logger.error("Destination Object is null");
+        continue;
+      }
+
+      output.setDestination(value);
+      output.loadConfig(map);
+
+      // We will only check for is_enabled out here. Down below we will
+      // check whether this output is enabled for the input
+      boolean isEnabled = output.getBooleanValue("is_enabled", true);
+      if (isEnabled) {
+        outputList.add(output);
+        output.logConfgs(Level.INFO);
+      } else {
+        logger.info("Output is disabled. So ignoring it. "
+          + output.getShortDescription());
+      }
+    }
+
+    // Second loop for input
+    for (Map<String, Object> map : inputConfigList) {
+      if (map == null) {
+        continue;
+      }
+      mergeBlocks(globalMap, map);
+
+      String value = (String) map.get("source");
+      Input input;
+      if (value == null || value.isEmpty()) {
+        logger.error("Input block doesn't have source element");
+        continue;
+      }
+      String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS);
+      if (classFullName == null || classFullName.isEmpty()) {
+        logger.error("Source block doesn't have source element");
+        continue;
+      }
+      input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT);
+
+      if (input == null) {
+        logger.error("Source Object is null");
+        continue;
+      }
+
+      input.setType(value);
+      input.loadConfig(map);
+
+      if (input.isEnabled()) {
+        input.setOutputMgr(outMgr);
+        input.setInputMgr(inputMgr);
+        inputMgr.add(input);
+        input.logConfgs(Level.INFO);
+      } else {
+        logger.info("Input is disabled. So ignoring it. "
+          + input.getShortDescription());
+      }
+    }
+
+    // Third loop is for filter, but we will have to create a filter
+    // instance for each input, so it can maintain the state per input
+    List<Input> toRemoveInputList = new ArrayList<Input>();
+    for (Input input : inputMgr.getInputList()) {
+      Filter prevFilter = null;
+      for (Map<String, Object> map : filterConfigList) {
+        if (map == null) {
+          continue;
+        }
+        mergeBlocks(globalMap, map);
+
+        String value = (String) map.get("filter");
+        Filter filter;
+        if (value == null || value.isEmpty()) {
+          logger.error("Filter block doesn't have filter element");
+          continue;
+        }
+
+        String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS);
+        if (classFullName == null || classFullName.isEmpty()) {
+          logger.error("Filter block doesn't have filter element");
+          continue;
+        }
+        filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER);
+
+        if (filter == null) {
+          logger.error("Filter Object is null");
+          continue;
+        }
+        filter.loadConfig(map);
+        filter.setInput(input);
+
+        if (filter.isEnabled()) {
+          filter.setOutputMgr(outMgr);
+          if (prevFilter == null) {
+            input.setFirstFilter(filter);
+          } else {
+            prevFilter.setNextFilter(filter);
+          }
+          prevFilter = filter;
+          filter.logConfgs(Level.INFO);
+        } else {
+          logger.debug("Ignoring filter "
+            + filter.getShortDescription() + " for input "
+            + input.getShortDescription());
+        }
+      }
+      if (input.getFirstFilter() == null) {
+        toRemoveInputList.add(input);
+      }
+    }
+
+    // Fourth loop is for associating valid outputs to input
+    Set<Output> usedOutputSet = new HashSet<Output>();
+    for (Input input : inputMgr.getInputList()) {
+      for (Output output : outputList) {
+        boolean ret = LogFeederUtil.isEnabled(output.getConfigs(),
+          input.getConfigs());
+        if (ret) {
+          usedOutputSet.add(output);
+          input.addOutput(output);
+        }
+      }
+    }
+    outputList = usedOutputSet;
+
+    for (Input toRemoveInput : toRemoveInputList) {
+      logger.warn("There are no filters, we will ignore this input. "
+        + toRemoveInput.getShortDescription());
+      inputMgr.removeInput(toRemoveInput);
+    }
+  }
+
+  /**
+   * @param filterConfigList2
+   * @return
+   */
+  private void sortBlocks(List<Map<String, Object>> blockList) {
+
+    Collections.sort(blockList, new Comparator<Map<String, Object>>() {
+
+      @Override
+      public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+        Object o1Sort = o1.get("sort_order");
+        Object o2Sort = o2.get("sort_order");
+        if (o1Sort == null) {
+          return 0;
+        }
+        if (o2Sort == null) {
+          return 0;
+        }
+        int o1Value = 0;
+        if (!(o1Sort instanceof Number)) {
+          try {
+            o1Value = (new Double(Double.parseDouble(o1Sort
+              .toString()))).intValue();
+          } catch (Throwable t) {
+            logger.error("Value is not of type Number. class="
+              + o1Sort.getClass().getName() + ", value="
+              + o1Sort.toString() + ", map=" + o1.toString());
+          }
+        } else {
+          o1Value = ((Number) o1Sort).intValue();
+        }
+        int o2Value = 0;
+        if (!(o2Sort instanceof Integer)) {
+          try {
+            o2Value = (new Double(Double.parseDouble(o2Sort
+              .toString()))).intValue();
+          } catch (Throwable t) {
+            logger.error("Value is not of type Number. class="
+              + o2Sort.getClass().getName() + ", value="
+              + o2Sort.toString() + ", map=" + o2.toString());
+          }
+        } else {
+
+        }
+        return o1Value - o2Value;
+      }
+    });
+  }
+
+  /**
+   * @param globalConfigList2
+   */
+  private Map<String, Object> mergeConfigs(
+    List<Map<String, Object>> configList) {
+    Map<String, Object> mergedConfig = new HashMap<String, Object>();
+    for (Map<String, Object> config : configList) {
+      mergeBlocks(config, mergedConfig);
+    }
+    return mergedConfig;
+  }
+
+  private void mergeBlocks(Map<String, Object> fromMap,
+                           Map<String, Object> toMap) {
+    // Merge the non-string
+    for (String key : fromMap.keySet()) {
+      Object objValue = fromMap.get(key);
+      if (objValue == null) {
+        continue;
+      }
+      if (objValue instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> globalFields = LogFeederUtil
+          .cloneObject((Map<String, Object>) fromMap.get(key));
+
+        @SuppressWarnings("unchecked")
+        Map<String, Object> localFields = (Map<String, Object>) toMap
+          .get(key);
+        if (localFields == null) {
+          localFields = new HashMap<String, Object>();
+          toMap.put(key, localFields);
+        }
+
+        if (globalFields != null) {
+          for (String fieldKey : globalFields.keySet()) {
+            if (!localFields.containsKey(fieldKey)) {
+              localFields.put(fieldKey,
+                globalFields.get(fieldKey));
+            }
+          }
+        }
+      }
+    }
+
+    // Let's add the rest of the top level fields if missing
+    for (String key : fromMap.keySet()) {
+      if (!toMap.containsKey(key)) {
+        toMap.put(key, fromMap.get(key));
+      }
+    }
+  }
+
+  private void monitor() throws Exception {
+    inputMgr.monitor();
+    Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
+
+    Thread statLogger = new Thread("statLogger") {
+
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            Thread.sleep(30 * 1000);
+          } catch (Throwable t) {
+            // Ignore
+          }
+          try {
+            logStats();
+          } catch (Throwable t) {
+            logger.error(
+              "LogStats: Caught exception while logging stats.",
+              t);
+          }
+
+          if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) {
+            lastCheckPointCleanedMS = System.currentTimeMillis();
+            inputMgr.cleanCheckPointFiles();
+          }
+        }
+      }
+
+    };
+    statLogger.setDaemon(true);
+    statLogger.start();
+
+  }
+
+  private void logStats() {
+    inputMgr.logStats();
+    outMgr.logStats();
+
+    if (metricsMgr.isMetricsEnabled()) {
+      List<MetricCount> metricsList = new ArrayList<MetricCount>();
+      inputMgr.addMetricsContainers(metricsList);
+      outMgr.addMetricsContainers(metricsList);
+      metricsMgr.useMetrics(metricsList);
+    }
+  }
+
+  /**
+   * @param inFile
+   * @return
+   * @throws Throwable
+   */
+  public String readFile(BufferedReader br) throws Exception {
+    try {
+      StringBuilder sb = new StringBuilder();
+      String line = br.readLine();
+      while (line != null) {
+        sb.append(line);
+        line = br.readLine();
+      }
+      return sb.toString();
+    } catch (Exception t) {
+      logger.error("Error loading properties file.", t);
+      throw t;
+    }
+  }
+
+  public Collection<Output> getOutputList() {
+    return outputList;
+  }
+
+  public OutputMgr getOutMgr() {
+    return outMgr;
+  }
+
+  public static void main(String[] args) {
+    LogFeeder logFeeder = new LogFeeder(args);
+    logFeeder.run(logFeeder);
+  }
+
+
+  public static void run(String[] args) {
+    LogFeeder logFeeder = new LogFeeder(args);
+    logFeeder.run(logFeeder);
+  }
+
+  public void run(LogFeeder logFeeder) {
+    try {
+      Date startTime = new Date();
+      logFeeder.init();
+      Date endTime = new Date();
+      logger.info("Took " + (endTime.getTime() - startTime.getTime())
+        + " ms to initialize");
+      logFeeder.monitor();
+
+    } catch (Throwable t) {
+      logger.fatal("Caught exception in main.", t);
+      System.exit(1);
+    }
+  }
+
+  private class JVMShutdownHook extends Thread {
+
+    public void run() {
+      try {
+        logger.info("Processing is shutting down.");
+
+        inputMgr.close();
+        outMgr.close();
+        inputMgr.checkInAll();
+
+        logStats();
+
+        logger.info("LogSearch is exiting.");
+      } catch (Throwable t) {
+        // Ignore
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
new file mode 100644
index 0000000..e53a227
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
+  static Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+
+  String collectorHosts = null;
+
+  public LogFeederAMSClient() {
+    collectorHosts = LogFeederUtil
+      .getStringProperty("metrics.collector.hosts");
+    if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+      collectorHosts = null;
+    }
+    if (collectorHosts != null) {
+      collectorHosts = collectorHosts.trim();
+    }
+    logger.info("AMS collector URL=" + collectorHosts);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
+   * getCollectorUri()
+   */
+  @Override
+  public String getCollectorUri() {
+
+    return collectorHosts;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
+   * getTimeoutSeconds()
+   */
+  @Override
+  protected int getTimeoutSeconds() {
+    // TODO: Hard coded timeout
+    return 10;
+  }
+
+  @Override
+  protected void emitMetrics(TimelineMetrics metrics) {
+    super.emitMetrics(metrics);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
new file mode 100644
index 0000000..7303694
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * This class contains utility methods used by LogFeeder
+ */
+public class LogFeederUtil {
+  static Logger logger = Logger.getLogger(LogFeederUtil.class);
+
+  final static int HASH_SEED = 31174077;
+  public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+  static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+
+  static Properties props;
+
+  private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+  private static int logInterval = 30000; // 30 seconds
+
+  public static Gson getGson() {
+    return gson;
+  }
+
+  /**
+   * This method will read the properties from System, followed by propFile
+   * and finally from the map
+   *
+   * @param propFile
+   * @param propNVList
+   * @throws Exception
+   */
+  static public void loadProperties(String propFile, String[] propNVList)
+    throws Exception {
+    logger.info("Loading properties. propFile=" + propFile);
+    props = new Properties(System.getProperties());
+    boolean propLoaded = false;
+
+    // First get properties file path from environment value
+    String propertiesFilePath = System.getProperty("properties");
+    if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+      File propertiesFile = new File(propertiesFilePath);
+      if (propertiesFile.exists() && propertiesFile.isFile()) {
+        logger.info("Properties file path set in environment. Loading properties file="
+          + propertiesFilePath);
+        FileInputStream fileInputStream = null;
+        try {
+          fileInputStream = new FileInputStream(propertiesFile);
+          props.load(fileInputStream);
+          propLoaded = true;
+        } catch (Throwable t) {
+          logger.error("Error loading properties file. properties file="
+            + propertiesFile.getAbsolutePath());
+        } finally {
+          if (fileInputStream != null) {
+            try {
+              fileInputStream.close();
+            } catch (Throwable t) {
+              // Ignore error
+            }
+          }
+        }
+      } else {
+        logger.error("Properties file path set in environment, but file not found. properties file="
+          + propertiesFilePath);
+      }
+    }
+
+    if (!propLoaded) {
+      // Properties not yet loaded, let's try from class loader
+      BufferedInputStream fileInputStream = (BufferedInputStream) LogFeeder.class
+        .getClassLoader().getResourceAsStream(propFile);
+      if (fileInputStream != null) {
+        logger.info("Loading properties file " + propFile
+          + " from classpath");
+        props.load(fileInputStream);
+        propLoaded = true;
+      } else {
+        logger.fatal("Properties file not found in classpath. properties file name= "
+          + propFile);
+      }
+    }
+
+    if (!propLoaded) {
+      logger.fatal("Properties file is not loaded.");
+      throw new Exception("Properties not loaded");
+    } else {
+      // Let's load properties from argument list
+      updatePropertiesFromMap(propNVList);
+    }
+  }
+
+  /**
+   * @param nvList
+   */
+  private static void updatePropertiesFromMap(String[] nvList) {
+    if (nvList == null) {
+      return;
+    }
+    logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
+      + nvList.length);
+    if (nvList != null && nvList.length > 0) {
+      for (String nv : nvList) {
+        logger.info("Passed nv=" + nv);
+        if (nv.startsWith("-") && nv.length() > 1) {
+          nv = nv.substring(1);
+          logger.info("Stripped nv=" + nv);
+          int i = nv.indexOf("=");
+          if (nv.length() > i) {
+            logger.info("Candidate nv=" + nv);
+            String name = nv.substring(0, i);
+            String value = nv.substring(i + 1);
+            logger.info("Adding property from argument to properties. name="
+              + name + ", value=" + value);
+            props.put(name, value);
+          }
+        }
+      }
+    }
+  }
+
+  static public String getStringProperty(String key) {
+    if (props != null) {
+      return props.getProperty(key);
+    }
+    return null;
+  }
+
+  static public String getStringProperty(String key, String defaultValue) {
+    if (props != null) {
+      return props.getProperty(key, defaultValue);
+    }
+    return defaultValue;
+  }
+
+  static public boolean getBooleanProperty(String key, boolean defaultValue) {
+    String strValue = getStringProperty(key);
+    return toBoolean(strValue, defaultValue);
+  }
+
+  private static boolean toBoolean(String strValue, boolean defaultValue) {
+    boolean retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      if (strValue.equalsIgnoreCase("true")
+        || strValue.equalsIgnoreCase("yes")) {
+        retValue = true;
+      } else {
+        retValue = false;
+      }
+    }
+    return retValue;
+  }
+
+  static public int getIntProperty(String key, int defaultValue) {
+    String strValue = getStringProperty(key);
+    int retValue = defaultValue;
+    retValue = objectToInt(strValue, retValue, ", key=" + key);
+    return retValue;
+  }
+
+  public static int objectToInt(Object objValue, int retValue,
+                                String errMessage) {
+    if (objValue == null) {
+      return retValue;
+    }
+    String strValue = objValue.toString();
+    if (!StringUtils.isEmpty(strValue)) {
+      try {
+        retValue = Integer.parseInt(strValue);
+      } catch (Throwable t) {
+        logger.error("Error parsing integer value. str=" + strValue
+          + ", " + errMessage);
+      }
+    }
+    return retValue;
+  }
+
+  static public boolean isEnabled(Map<String, Object> configs) {
+    return isEnabled(configs, configs);
+  }
+
+  static public boolean isEnabled(Map<String, Object> conditionConfigs,
+                                  Map<String, Object> valueConfigs) {
+    boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
+      .get("conditions");
+    if (conditions != null && conditions.size() > 0) {
+      allow = false;
+      for (String conditionType : conditions.keySet()) {
+        if (conditionType.equalsIgnoreCase("fields")) {
+          @SuppressWarnings("unchecked")
+          Map<String, Object> fields = (Map<String, Object>) conditions
+            .get("fields");
+          for (String fieldName : fields.keySet()) {
+            Object values = fields.get(fieldName);
+            if (values instanceof String) {
+              allow = isFieldConditionMatch(valueConfigs,
+                fieldName, (String) values);
+            } else {
+              @SuppressWarnings("unchecked")
+              List<String> listValues = (List<String>) values;
+              for (String stringValue : listValues) {
+                allow = isFieldConditionMatch(valueConfigs,
+                  fieldName, stringValue);
+                if (allow) {
+                  break;
+                }
+              }
+            }
+            if (allow) {
+              break;
+            }
+          }
+        }
+        if (allow) {
+          break;
+        }
+      }
+    }
+    return allow;
+  }
+
+  static public boolean isFieldConditionMatch(Map<String, Object> configs,
+                                              String fieldName, String stringValue) {
+    boolean allow = false;
+    String fieldValue = (String) configs.get(fieldName);
+    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
+      allow = true;
+    } else {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> addFields = (Map<String, Object>) configs
+        .get("add_fields");
+      if (addFields != null && addFields.get(fieldName) != null) {
+        String addFieldValue = (String) addFields.get(fieldName);
+        if (stringValue.equalsIgnoreCase(addFieldValue)) {
+          allow = true;
+        }
+      }
+
+    }
+    return allow;
+  }
+
+  static public void logStatForMetric(MetricCount metric, String prefixStr,
+                                      String postFix) {
+    long currStat = metric.count;
+    long currMS = System.currentTimeMillis();
+    if (currStat > metric.prevLogCount) {
+      if (postFix == null) {
+        postFix = "";
+      }
+      logger.info(prefixStr + ": total_count=" + metric.count
+        + ", duration=" + (currMS - metric.prevLogMS) / 1000
+        + " secs, count=" + (currStat - metric.prevLogCount)
+        + postFix);
+    }
+    metric.prevLogCount = currStat;
+    metric.prevLogMS = currMS;
+  }
+
+  static public void logCountForMetric(MetricCount metric, String prefixStr,
+                                       String postFix) {
+    logger.info(prefixStr + ": count=" + metric.count + postFix);
+  }
+
+  public static Map<String, Object> cloneObject(Map<String, Object> map) {
+    if (map == null) {
+      return null;
+    }
+    String jsonStr = gson.toJson(map);
+    // We need to clone it, so we will create a JSON string and convert it
+    // back
+    Type type = new TypeToken<Map<String, Object>>() {
+    }.getType();
+    return gson.fromJson(jsonStr, type);
+  }
+
+  public static Map<String, Object> toJSONObject(String jsonStr) {
+    Type type = new TypeToken<Map<String, Object>>() {
+    }.getType();
+    return gson.fromJson(jsonStr, type);
+  }
+
+  static public boolean logErrorMessageByInterval(String key, String message,
+                                                  Throwable e, Logger callerLogger, Level level) {
+
+    LogHistory log = logHistoryList.get(key);
+    if (log == null) {
+      log = new LogHistory();
+      logHistoryList.put(key, log);
+    }
+    if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+      log.lastLogTime = System.currentTimeMillis();
+      int counter = log.counter;
+      log.counter = 0;
+      if (counter > 0) {
+        message += ". Messages suppressed before: " + counter;
+      }
+      if (e == null) {
+        callerLogger.log(level, message);
+      } else {
+        callerLogger.log(level, message, e);
+      }
+
+      return true;
+    } else {
+      log.counter++;
+    }
+    return false;
+
+  }
+
+  static public String subString(String str, int maxLength) {
+    if (str == null || str.length() == 0) {
+      return "";
+    }
+    maxLength = str.length() < maxLength ? str.length() : maxLength;
+    return str.substring(0, maxLength);
+  }
+
+  static public long genHash(String value) {
+    if (value == null) {
+      value = "null";
+    }
+    return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
+  }
+
+  static class LogHistory {
+    long lastLogTime = 0;
+    int counter = 0;
+  }
+
+  public static String getDate(String timeStampStr) {
+    try {
+      DateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+      Date netDate = (new Date(Long.parseLong(timeStampStr)));
+      return sdf.format(netDate);
+    } catch (Exception ex) {
+      return null;
+    }
+  }
+
+  public static File getFileFromClasspath(String filename) {
+    URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
+      .getResource(filename);
+    logger.debug("File Complete URI :" + fileCompleteUrl);
+    File file = null;
+    try {
+      file = new File(fileCompleteUrl.toURI());
+    } catch (Exception exception) {
+      logger.debug(exception.getMessage(), exception.getCause());
+    }
+    return file;
+  }
+
+  public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
+    Object instance = null;
+    try {
+      instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+    } catch (Exception exception) {
+      logger.error("Unsupported class =" + classFullName, exception.getCause());
+    }
+    // check instance class as par aliasType
+    if (instance != null) {
+      boolean isValid = false;
+      switch (aliasType) {
+        case FILTER:
+          isValid = Filter.class.isAssignableFrom(instance.getClass());
+          break;
+        case INPUT:
+          isValid = Input.class.isAssignableFrom(instance.getClass());
+          break;
+        case OUTPUT:
+          isValid = Output.class.isAssignableFrom(instance.getClass());
+          break;
+        case MAPPER:
+          isValid = Mapper.class.isAssignableFrom(instance.getClass());
+          break;
+        default:
+          // by default consider all are valid class
+          isValid = true;
+      }
+      if (!isValid) {
+        logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+      }
+    }
+    return instance;
+  }
+
+  /**
+   * @param fileName
+   * @return
+   */
+  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
+      });
+      return jsonmap;
+    } catch (JsonParseException e) {
+      logger.error(e, e.getCause());
+    } catch (JsonMappingException e) {
+      logger.error(e, e.getCause());
+    } catch (IOException e) {
+      logger.error(e, e.getCause());
+    }
+    return new HashMap<String, Object>();
+  }
+
+  public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+    if (list != null) {
+      for (String value : list) {
+        if (value != null) {
+          if (caseSensitive) {
+            if (value.equals(str)) {
+              return true;
+            }
+          } else {
+            if (value.equalsIgnoreCase(str)) {
+              return true;
+            }
+          }
+          if (value.equalsIgnoreCase("ALL")) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
new file mode 100644
index 0000000..c715881
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+public class MetricCount {
+  public String metricsName = null;
+  public boolean isPointInTime = false;
+
+  public long count = 0;
+  public long prevLogCount = 0;
+  public long prevLogMS = System.currentTimeMillis();
+  public long prevPublishCount = 0;
+  public long prevPublishMS = 0; // We will try to publish one immediately
+  public int publishCount = 0; // Count of published metrics. Used for first
+  // time sending metrics
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
new file mode 100644
index 0000000..2152d14
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsMgr {
+  static Logger logger = Logger.getLogger(MetricsMgr.class);
+
+  boolean isMetricsEnabled = false;
+  String nodeHostName = null;
+  String appId = "logfeeder";
+
+  long lastPublishTimeMS = 0; // Let's do the first publish immediately
+  long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the
+  // clock
+
+  int publishIntervalMS = 60 * 1000;
+  int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
+  // the metrics in memory forever
+  HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+  LogFeederAMSClient amsClient = null;
+
+  public void init() {
+    logger.info("Initializing MetricsMgr()");
+    amsClient = new LogFeederAMSClient();
+
+    if (amsClient.getCollectorUri() != null) {
+      nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+      if (nodeHostName == null) {
+        try {
+          nodeHostName = InetAddress.getLocalHost().getHostName();
+        } catch (Throwable e) {
+          logger.warn(
+            "Error getting hostname using InetAddress.getLocalHost().getHostName()",
+            e);
+        }
+        if (nodeHostName == null) {
+          try {
+            nodeHostName = InetAddress.getLocalHost()
+              .getCanonicalHostName();
+          } catch (Throwable e) {
+            logger.warn(
+              "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
+              e);
+          }
+        }
+      }
+      if (nodeHostName == null) {
+        isMetricsEnabled = false;
+        logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+      } else {
+        isMetricsEnabled = true;
+        logger.info("LogFeeder Metrics is enabled. Metrics host="
+          + amsClient.getCollectorUri());
+      }
+    } else {
+      logger.info("LogFeeder Metrics publish is disabled");
+    }
+  }
+
+  /**
+   * @return
+   */
+  public boolean isMetricsEnabled() {
+    return isMetricsEnabled;
+  }
+
+  /**
+   * @param metricsList
+   */
+  synchronized public void useMetrics(List<MetricCount> metricsList) {
+    if (!isMetricsEnabled) {
+      return;
+    }
+    logger.info("useMetrics() metrics.size=" + metricsList.size());
+    long currMS = System.currentTimeMillis();
+    Long currMSLong = new Long(currMS);
+    for (MetricCount metric : metricsList) {
+      if (metric.metricsName == null) {
+        logger.debug("metric.metricsName is null");
+        // Metrics is not meant to be published
+        continue;
+      }
+      long currCount = metric.count;
+      if (!metric.isPointInTime && metric.publishCount > 0
+        && currCount <= metric.prevPublishCount) {
+        // No new data added, so let's ignore it
+        logger.debug("Nothing changed. " + metric.metricsName
+          + ", currCount=" + currCount + ", prevPublishCount="
+          + metric.prevPublishCount);
+        continue;
+      }
+      metric.publishCount++;
+
+      TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+      if (timelineMetric == null) {
+        logger.debug("Creating new metric obbject for "
+          + metric.metricsName);
+        // First time for this metric
+        timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metric.metricsName);
+        timelineMetric.setHostName(nodeHostName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setStartTime(currMS);
+        timelineMetric.setType("Long");
+        timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+        metricsMap.put(metric.metricsName, timelineMetric);
+      }
+      logger.debug("Adding metrics=" + metric.metricsName);
+      if (metric.isPointInTime) {
+        timelineMetric.getMetricValues().put(currMSLong,
+          new Double(currCount));
+      } else {
+        Double value = timelineMetric.getMetricValues().get(currMSLong);
+        if (value == null) {
+          value = new Double(0);
+        }
+        value += (currCount - metric.prevPublishCount);
+        timelineMetric.getMetricValues().put(currMSLong, value);
+        metric.prevPublishCount = currCount;
+        metric.prevPublishMS = currMS;
+      }
+    }
+
+    if (metricsMap.size() > 0
+      && currMS - lastPublishTimeMS > publishIntervalMS) {
+      try {
+        // Time to publish
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
+        timeLineMetricList.addAll(metricsMap.values());
+        timelineMetrics.setMetrics(timeLineMetricList);
+        amsClient.emitMetrics(timelineMetrics);
+        logger.info("Published " + timeLineMetricList.size()
+          + " metrics to AMS");
+        metricsMap.clear();
+        timeLineMetricList.clear();
+        lastPublishTimeMS = currMS;
+      } catch (Throwable t) {
+        logger.warn("Error sending metrics to AMS.", t);
+        if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+          logger.error("AMS was not sent for last "
+            + maxMetricsBuffer
+            / 1000
+            + " seconds. Purging it and will start rebuilding it again");
+          metricsMap.clear();
+          lastFailedPublishTimeMS = currMS;
+        }
+      }
+    } else {
+      logger.info("Not publishing metrics. metrics.size()="
+        + metricsMap.size() + ", lastPublished="
+        + (currMS - lastPublishTimeMS) / 1000
+        + " seconds ago, intervalConfigured=" + publishIntervalMS
+        / 1000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
new file mode 100644
index 0000000..2a54f28
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup.  See http://murmurhash.googlepages.com/ for more details.
+ * <p/>
+ * <p>The C version of MurmurHash 2.0 found at that site was ported
+ * to Java by Andrzej Bialecki (ab at getopt org).</p>
+ */
+public final class MurmurHash {
+
+  private MurmurHash() {
+  }
+
+  /**
+   * Hashes an int.
+   *
+   * @param data The int to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit hash of the bytes in question.
+   */
+  public static int hash(int data, int seed) {
+    return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
+  }
+
+  /**
+   * Hashes bytes in an array.
+   *
+   * @param data The bytes to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit hash of the bytes in question.
+   */
+  public static int hash(byte[] data, int seed) {
+    return hash(ByteBuffer.wrap(data), seed);
+  }
+
+  /**
+   * Hashes bytes in part of an array.
+   *
+   * @param data   The data to hash.
+   * @param offset Where to start munging.
+   * @param length How many bytes to process.
+   * @param seed   The seed to start with.
+   * @return The 32-bit hash of the data in question.
+   */
+  public static int hash(byte[] data, int offset, int length, int seed) {
+    return hash(ByteBuffer.wrap(data, offset, length), seed);
+  }
+
+  /**
+   * Hashes the bytes in a buffer from the current position to the limit.
+   *
+   * @param buf  The bytes to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit murmur hash of the bytes in the buffer.
+   */
+  public static int hash(ByteBuffer buf, int seed) {
+    // save byte order for later restoration
+    ByteOrder byteOrder = buf.order();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+
+    int m = 0x5bd1e995;
+    int r = 24;
+
+    int h = seed ^ buf.remaining();
+
+    while (buf.remaining() >= 4) {
+      int k = buf.getInt();
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h *= m;
+      h ^= k;
+    }
+
+    if (buf.remaining() > 0) {
+      ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+      // for big-endian version, use this first:
+      // finish.position(4-buf.remaining());
+      finish.put(buf).rewind();
+      h ^= finish.getInt();
+      h *= m;
+    }
+
+    h ^= h >>> 13;
+    h *= m;
+    h ^= h >>> 15;
+
+    buf.order(byteOrder);
+    return h;
+  }
+
+
+  public static long hash64A(byte[] data, int seed) {
+    return hash64A(ByteBuffer.wrap(data), seed);
+  }
+
+  public static long hash64A(byte[] data, int offset, int length, int seed) {
+    return hash64A(ByteBuffer.wrap(data, offset, length), seed);
+  }
+
+  public static long hash64A(ByteBuffer buf, int seed) {
+    ByteOrder byteOrder = buf.order();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+
+    long m = 0xc6a4a7935bd1e995L;
+    int r = 47;
+
+    long h = seed ^ (buf.remaining() * m);
+
+    while (buf.remaining() >= 8) {
+      long k = buf.getLong();
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+
+    if (buf.remaining() > 0) {
+      ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+      // for big-endian version, do this first:
+      // finish.position(8-buf.remaining());
+      finish.put(buf).rewind();
+      h ^= finish.getLong();
+      h *= m;
+    }
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+
+    buf.order(byteOrder);
+    return h;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
new file mode 100644
index 0000000..5b70bca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputMgr {
+  static Logger logger = Logger.getLogger(OutputMgr.class);
+
+  Collection<Output> outputList = new ArrayList<Output>();
+
+  String hostName = null;
+  String ipAddress = null;
+  boolean addMessageMD5 = true;
+
+  private int MAX_OUTPUT_SIZE = 32765; // 32766-1
+  static long doc_counter = 0;
+  public MetricCount messageTruncateMetric = new MetricCount();
+
+  public OutputMgr() {
+    // Set the host for this server
+    try {
+      InetAddress ip = InetAddress.getLocalHost();
+      ipAddress = ip.getHostAddress();
+      hostName = ip.getHostName();
+    } catch (UnknownHostException e) {
+      logger.error("Error getting hostname.", e);
+    }
+  }
+
+  public Collection<Output> getOutputList() {
+    return outputList;
+  }
+
+  public void setOutputList(Collection<Output> outputList) {
+    this.outputList = outputList;
+  }
+
+  /**
+   * @param jsonObj
+   * @param inputStr
+   * @param input
+   */
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : input.getContextFields()
+      .entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null) {
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // TODO: Ideally most of the overrides should be configurable
+
+    // Add the input type
+    if (jsonObj.get("type") == null) {
+      jsonObj.put("type", input.getStringValue("type"));
+    }
+    if (jsonObj.get("path") == null && input.getFilePath() != null) {
+      jsonObj.put("path", input.getFilePath());
+    }
+    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+      jsonObj.put("path", input.getStringValue("path"));
+    }
+
+    // Add host if required
+    if (jsonObj.get("host") == null && hostName != null) {
+      jsonObj.put("host", hostName);
+    }
+    // Add IP if required
+    if (jsonObj.get("ip") == null && ipAddress != null) {
+      jsonObj.put("ip", ipAddress);
+    }
+
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
+        .toJson(jsonObj));
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + eventMD5.toString());
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + eventMD5.toString());
+      }
+    }
+
+    // jsonObj.put("@timestamp", new Date());
+    jsonObj.put("seq_num", new Long(doc_counter++));
+    if (jsonObj.get("id") == null) {
+      jsonObj.put("id", UUID.randomUUID().toString());
+    }
+    if (jsonObj.get("event_count") == null) {
+      jsonObj.put("event_count", new Integer(1));
+    }
+    if (inputMarker.lineNumber > 0) {
+      jsonObj.put("logfile_line_number", new Integer(
+        inputMarker.lineNumber));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      if (logMessage != null
+        && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+        messageTruncateMetric.count++;
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_MESSAGESIZE";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+          "Message is too big. size="
+            + logMessage.getBytes().length + ", input="
+            + input.getShortDescription()
+            + ". Truncating to " + MAX_OUTPUT_SIZE
+            + ", first upto 100 characters="
+            + LogFeederUtil.subString(logMessage, 100),
+          null, logger, Level.WARN);
+        logMessage = new String(logMessage.getBytes(), 0,
+          MAX_OUTPUT_SIZE);
+        jsonObj.put("log_message", logMessage);
+        // Add error tags
+        @SuppressWarnings("unchecked")
+        List<String> tagsList = (List<String>) jsonObj.get("tags");
+        if (tagsList == null) {
+          tagsList = new ArrayList<String>();
+          jsonObj.put("tags", tagsList);
+        }
+        tagsList.add("error_message_truncated");
+
+      }
+      if (addMessageMD5) {
+        jsonObj.put("message_md5",
+          "" + LogFeederUtil.genHash(logMessage));
+      }
+    }
+    //check log is allowed to send output
+    if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+      for (Output output : input.getOutputList()) {
+        try {
+          output.write(jsonObj, inputMarker);
+        } catch (Exception e) {
+          logger.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  public void write(String jsonBlock, InputMarker inputMarker) {
+    //check log is allowed to send output
+    if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+      for (Output output : inputMarker.input.getOutputList()) {
+        try {
+          output.write(jsonBlock, inputMarker);
+        } catch (Exception e) {
+          logger.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Close all the outputs
+   */
+  public void close() {
+    logger.info("Close called for outputs ...");
+    for (Output output : outputList) {
+      try {
+        output.setDrain(true);
+        output.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    int i;
+    boolean allClosed = true;
+    for (i = 0; i < iterations; i++) {
+      allClosed = true;
+      for (Output output : outputList) {
+        if (!output.isClosed()) {
+          try {
+            allClosed = false;
+            logger.warn("Waiting for output to close. "
+              + output.getShortDescription() + ", "
+              + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        break;
+      }
+    }
+
+    if (!allClosed) {
+      logger.warn("Some outpus were not closed. Iterations=" + i);
+      for (Output output : outputList) {
+        if (!output.isClosed()) {
+          logger.warn("Output not closed. Will ignore it."
+            + output.getShortDescription() + ", pendingCound="
+            + output.getPendingCount());
+        }
+      }
+    } else {
+      logger.info("All outputs are closed. Iterations=" + i);
+    }
+  }
+
+  /**
+   *
+   */
+  public void logStats() {
+    for (Output output : outputList) {
+      output.logStat();
+    }
+    LogFeederUtil.logStatForMetric(messageTruncateMetric,
+      "Stat: Messages Truncated", null);
+  }
+
+  /**
+   * @param metricsList
+   */
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    metricsList.add(messageTruncateMetric);
+    for (Output output : outputList) {
+      output.addMetricsContainers(metricsList);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
new file mode 100644
index 0000000..aa1edea
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.AliasUtil;
+import org.apache.ambari.logfeeder.ConfigBlock;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+public abstract class Filter extends ConfigBlock {
+  static private Logger logger = Logger.getLogger(Filter.class);
+
+  OutputMgr outputMgr;
+  Input input;
+  Filter nextFilter = null;
+
+  Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+
+    initializePostMapValues();
+    if (nextFilter != null) {
+      nextFilter.init();
+    }
+  }
+
+  /**
+   *
+   */
+  @SuppressWarnings("unchecked")
+  protected void initializePostMapValues() {
+    // Initialize map values
+    Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
+    if (postMapValues == null) {
+      return;
+    }
+    for (String fieldName : postMapValues.keySet()) {
+      List<Map<String, Object>> mapList = null;
+      Object values = postMapValues.get(fieldName);
+      if (values instanceof List<?>) {
+        mapList = (List<Map<String, Object>>) values;
+      } else {
+        mapList = new ArrayList<Map<String, Object>>();
+        mapList.add((Map<String, Object>) values);
+      }
+      for (Map<String, Object> mapObject : mapList) {
+        for (String mapClassCode : mapObject.keySet()) {
+          Mapper mapper = getMapper(mapClassCode);
+          if (mapper == null) {
+            break;
+          }
+          if (mapper.init(getInput().getShortDescription(),
+            fieldName, mapClassCode,
+            mapObject.get(mapClassCode))) {
+            List<Mapper> fieldMapList = postFieldValueMappers
+              .get(fieldName);
+            if (fieldMapList == null) {
+              fieldMapList = new ArrayList<Mapper>();
+              postFieldValueMappers.put(fieldName, fieldMapList);
+            }
+            fieldMapList.add(mapper);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * @param mapClassCode
+   * @return
+   */
+  protected Mapper getMapper(String mapClassCode) {
+    String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS);
+    if (classFullName != null && !classFullName.isEmpty()) {
+      Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER);
+      return mapper;
+    }
+    return null;
+  }
+
+  public void setOutputMgr(OutputMgr outputMgr) {
+    this.outputMgr = outputMgr;
+  }
+
+  public Filter getNextFilter() {
+    return nextFilter;
+  }
+
+  public void setNextFilter(Filter nextFilter) {
+    this.nextFilter = nextFilter;
+  }
+
+  public Input getInput() {
+    return input;
+  }
+
+  public void setInput(Input input) {
+    this.input = input;
+  }
+
+  /**
+   * Deriving classes should implement this at the minimum
+   *
+   * @param inputStr
+   * @param marker
+   */
+  public void apply(String inputStr, InputMarker inputMarker) {
+    // TODO: There is no transformation for string types.
+    if (nextFilter != null) {
+      nextFilter.apply(inputStr, inputMarker);
+    } else {
+      outputMgr.write(inputStr, inputMarker);
+    }
+  }
+
+  public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    if (postFieldValueMappers.size() > 0) {
+      for (String fieldName : postFieldValueMappers.keySet()) {
+        Object value = jsonObj.get(fieldName);
+        if (value != null) {
+          for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
+            value = mapper.apply(jsonObj, value);
+          }
+        }
+      }
+    }
+    if (nextFilter != null) {
+      nextFilter.apply(jsonObj, inputMarker);
+    } else {
+      outputMgr.write(jsonObj, inputMarker);
+    }
+  }
+
+  /**
+   *
+   */
+  public void close() {
+    if (nextFilter != null) {
+      nextFilter.close();
+    }
+  }
+
+  public void flush() {
+
+  }
+
+  @Override
+  public void logStat() {
+    super.logStat();
+    if (nextFilter != null) {
+      nextFilter.logStat();
+    }
+  }
+
+  @Override
+  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
+    if (!super.isFieldConditionMatch(fieldName, stringValue)) {
+      // Let's try input
+      if (input != null) {
+        return input.isFieldConditionMatch(fieldName, stringValue);
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public String getShortDescription() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public boolean logConfgs(Priority level) {
+    if (!super.logConfgs(level)) {
+      return false;
+    }
+    logger.log(level, "input=" + input.getShortDescription());
+    return true;
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    super.addMetricsContainers(metricsList);
+    if (nextFilter != null) {
+      nextFilter.addMetricsContainers(metricsList);
+    }
+  }
+
+}