You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 23:38:37 UTC

[47/50] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 18a5a54..e1a0bb9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -30,26 +30,27 @@ import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 public class OutputFile extends Output {
-  static Logger logger = Logger.getLogger(OutputFile.class);
+  private static final Logger LOG = Logger.getLogger(OutputFile.class);
 
-  PrintWriter outWriter = null;
-  String filePath = null;
-  String codec;
+  private PrintWriter outWriter;
+  private String filePath = null;
+  private String codec;
 
   @Override
   public void init() throws Exception {
     super.init();
 
     filePath = getStringValue("path");
-    if (filePath == null || filePath.isEmpty()) {
-      logger.error("Filepath config property <path> is not set in config file.");
+    if (StringUtils.isEmpty(filePath)) {
+      LOG.error("Filepath config property <path> is not set in config file.");
       return;
     }
     codec = getStringValue("codec");
-    if (codec == null || codec.trim().isEmpty()) {
+    if (StringUtils.isBlank(codec)) {
       codec = "json";
     } else {
       if (codec.trim().equalsIgnoreCase("csv")) {
@@ -57,12 +58,11 @@ public class OutputFile extends Output {
       } else if (codec.trim().equalsIgnoreCase("json")) {
         codec = "csv";
       } else {
-        logger.error("Unsupported codec type. codec=" + codec
-          + ", will use json");
+        LOG.error("Unsupported codec type. codec=" + codec + ", will use json");
         codec = "json";
       }
     }
-    logger.info("Out filePath=" + filePath + ", codec=" + codec);
+    LOG.info("Out filePath=" + filePath + ", codec=" + codec);
     File outFile = new File(filePath);
     if (outFile.getParentFile() != null) {
       File parentDir = outFile.getParentFile();
@@ -71,16 +71,14 @@ public class OutputFile extends Output {
       }
     }
 
-    outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile,
-      true)));
+    outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile, true)));
 
-    logger.info("init() is successfull. filePath="
-      + outFile.getAbsolutePath());
+    LOG.info("init() is successfull. filePath=" + outFile.getAbsolutePath());
   }
 
   @Override
   public void close() {
-    logger.info("Closing file." + getShortDescription());
+    LOG.info("Closing file." + getShortDescription());
     if (outWriter != null) {
       try {
         outWriter.close();
@@ -92,8 +90,7 @@ public class OutputFile extends Output {
   }
 
   @Override
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
-    throws Exception {
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
     String outStr = null;
     CSVPrinter csvPrinter = null;
     try {
@@ -104,7 +101,7 @@ public class OutputFile extends Output {
         outStr = LogFeederUtil.getGson().toJson(jsonObj);
       }
       if (outWriter != null && outStr != null) {
-        statMetric.count++;
+        statMetric.value++;
 
         outWriter.println(outStr);
         outWriter.flush();
@@ -122,7 +119,7 @@ public class OutputFile extends Output {
   @Override
   synchronized public void write(String block, InputMarker inputMarker) throws Exception {
     if (outWriter != null && block != null) {
-      statMetric.count++;
+      statMetric.value++;
 
       outWriter.println(block);
       outWriter.flush();
@@ -135,10 +132,7 @@ public class OutputFile extends Output {
   }
 
   @Override
-  public void copyFile(File inputFile, InputMarker inputMarker)
-      throws UnsupportedOperationException {
-    throw new UnsupportedOperationException(
-        "copyFile method is not yet supported for output=file");
+  public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("copyFile method is not yet supported for output=file");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index a360215..8f4b0b1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -43,7 +43,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  * The events are spooled on the local file system and uploaded in batches asynchronously.
  */
 public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
-  private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+  private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class);
+  
   private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
 
   private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
@@ -72,23 +73,20 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
     rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
     filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
     if (StringUtils.isEmpty(hdfsOutDir)) {
-      logger
-          .error("HDFS config property <hdfs_out_dir> is not set in config file.");
+      LOG.error("HDFS config property <hdfs_out_dir> is not set in config file.");
       return;
     }
     if (StringUtils.isEmpty(hdfsHost)) {
-      logger
-          .error("HDFS config property <hdfs_host> is not set in config file.");
+      LOG.error("HDFS config property <hdfs_host> is not set in config file.");
       return;
     }
     if (StringUtils.isEmpty(hdfsPort)) {
-      logger
-          .error("HDFS config property <hdfs_port> is not set in config file.");
+      LOG.error("HDFS config property <hdfs_port> is not set in config file.");
       return;
     }
     HashMap<String, String> contextParam = buildContextParam();
     hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
-    logger.info("hdfs Output dir=" + hdfsOutDir);
+    LOG.info("hdfs Output dir=" + hdfsOutDir);
     String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
     logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
     this.startHDFSCopyThread();
@@ -96,18 +94,17 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
 
   @Override
   public void close() {
-    logger.info("Closing file." + getShortDescription());
+    LOG.info("Closing file." + getShortDescription());
     logSpooler.rollover();
     this.stopHDFSCopyThread();
     isClosed = true;
   }
 
   @Override
-  synchronized public void write(String block, InputMarker inputMarker)
-      throws Exception {
+  public synchronized void write(String block, InputMarker inputMarker) throws Exception {
     if (block != null) {
       logSpooler.add(block);
-      statMetric.count++;
+      statMetric.value++;
     }
   }
 
@@ -127,24 +124,19 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
             Iterator<File> localFileIterator = localReadyFiles.iterator();
             while (localFileIterator.hasNext()) {
               File localFile = localFileIterator.next();
-              fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost,
-                  hdfsPort);
+              fileSystem = LogfeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
               if (fileSystem != null && localFile.exists()) {
                 String destFilePath = hdfsOutDir + "/" + localFile.getName();
                 String localPath = localFile.getAbsolutePath();
                 boolean overWrite = true;
                 boolean delSrc = true;
-                boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal(
-                    localFile.getAbsolutePath(), destFilePath, fileSystem,
+                boolean isCopied = LogfeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
                     overWrite, delSrc);
                 if (isCopied) {
-                  logger.debug("File copy to hdfs hdfspath :" + destFilePath
-                      + " and deleted local file :" + localPath);
+                  LOG.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
                 } else {
-                  // TODO Need to write retry logic, in next release we can
-                  // handle it
-                  logger.error("Hdfs file copy  failed for hdfspath :"
-                      + destFilePath + " and localpath :" + localPath);
+                  // TODO Need to write retry logic, in next release we can handle it
+                  LOG.error("Hdfs file copy  failed for hdfspath :" + destFilePath + " and localpath :" + localPath);
                 }
               }
               localFileIterator.remove();
@@ -157,14 +149,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
                 }
               }
             } catch (InterruptedException e) {
-              logger.error(e.getLocalizedMessage(),e);
+              LOG.error(e.getLocalizedMessage(),e);
             }
           }
         } catch (Exception e) {
-          logger
-              .error(
-                  "Exception in hdfsCopyThread errorMsg:"
-                      + e.getLocalizedMessage(), e);
+          LOG.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e);
         }
       }
     };
@@ -174,24 +163,23 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
 
   private void stopHDFSCopyThread() {
     if (hdfsCopyThread != null) {
-      logger.info("waiting till copy all local files to hdfs.......");
+      LOG.info("waiting till copy all local files to hdfs.......");
       while (!localReadyFiles.isEmpty()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
-          logger.error(e.getLocalizedMessage(), e);
+          LOG.error(e.getLocalizedMessage(), e);
         }
-        logger.debug("still waiting to copy all local files to hdfs.......");
+        LOG.debug("still waiting to copy all local files to hdfs.......");
       }
-      logger.info("calling interrupt method for hdfsCopyThread to stop it.");
+      LOG.info("calling interrupt method for hdfsCopyThread to stop it.");
       try {
         hdfsCopyThread.interrupt();
       } catch (SecurityException exception) {
-        logger.error(" Current thread : '" + Thread.currentThread().getName()
-            + "' does not have permission to interrupt the Thread: '"
-            + hdfsCopyThread.getName() + "'");
+        LOG.error(" Current thread : '" + Thread.currentThread().getName() +
+            "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
       }
-      LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem);
+      LogfeederHDFSUtil.closeFileSystem(fileSystem);
     }
   }
 
@@ -208,15 +196,13 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
         readyMonitor.notifyAll();
       }
     } catch (Exception e) {
-      logger.error(e.getLocalizedMessage(),e);
+      LOG.error(e.getLocalizedMessage(),e);
     }
   }
 
   @Override
-  public void copyFile(File inputFile, InputMarker inputMarker)
-      throws UnsupportedOperationException {
-    throw new UnsupportedOperationException(
-        "copyFile method is not yet supported for output=hdfs");     
+  public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
   }
 
   /**
@@ -242,8 +228,8 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
     long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
     boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
     if (shouldRollover) {
-      logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
-                    " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
+      LOG.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
+          " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
     }
     return shouldRollover;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index 2595d87..52fc6f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -56,6 +56,16 @@ public class OutputKafka extends Output {
   private boolean isKafkaBrokerUp = false;
 
   @Override
+  protected String getStatMetricName() {
+    return "output.kafka.write_logs";
+  }
+
+  @Override
+  protected String getWriteBytesMetricName() {
+    return "output.kafka.write_bytes";
+  }
+  
+  @Override
   public void init() throws Exception {
     super.init();
     Properties props = initProperties();
@@ -65,9 +75,6 @@ public class OutputKafka extends Output {
   }
 
   private Properties initProperties() throws Exception {
-    statMetric.metricsName = "output.kafka.write_logs";
-    writeBytesMetric.metricsName = "output.kafka.write_bytes";
-
     String brokerList = getStringValue("broker_list");
     if (StringUtils.isEmpty(brokerList)) {
       throw new Exception("For kafka output, bootstrap broker_list is needed");
@@ -124,17 +131,15 @@ public class OutputKafka extends Output {
             if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
               kafkaCallBack = null;
             } else {
-              LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for "
-                  + FAILED_RETRY_INTERVAL + " seconds");
+              LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " +
+                  FAILED_RETRY_INTERVAL + " seconds");
               Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
             }
 
           } catch (Throwable t) {
             String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
-            LogFeederUtil.logErrorMessageByInterval(logMessageKey,
-                "Error sending message to Kafka during retry. message="
-                    + (kafkaCallBack == null ? null : kafkaCallBack.message),
-                t, LOG, Level.ERROR);
+            LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending message to Kafka during retry. message=" +
+                (kafkaCallBack == null ? null : kafkaCallBack.message), t, LOG, Level.ERROR);
           }
         }
 
@@ -160,8 +165,8 @@ public class OutputKafka extends Output {
           LOG.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds");
           Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
         } else {
-          LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages="
-              + failedMessages.size() + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
+          LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + failedMessages.size() +
+              " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
           Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
         }
       } catch (Throwable t) {
@@ -198,16 +203,15 @@ public class OutputKafka extends Output {
 
   private boolean publishMessage(String block, InputMarker inputMarker) {
     if (isAsync && isKafkaBrokerUp) { // Send asynchronously
-      producer.send(new ProducerRecord<String, String>(topic, block),
-          new KafkaCallBack(this, block, inputMarker, ++messageCount));
+      producer.send(new ProducerRecord<String, String>(topic, block), new KafkaCallBack(this, block, inputMarker, ++messageCount));
       return true;
     } else { // Send synchronously
       try {
         // Not using key. Let it round robin
         RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get();
         if (metadata != null) {
-          statMetric.count++;
-          writeBytesMetric.count += block.length();
+          statMetric.value++;
+          writeBytesMetric.value += block.length();
         }
         if (!isKafkaBrokerUp) {
           LOG.info("Started writing to kafka. " + getShortDescription());
@@ -217,18 +221,18 @@ public class OutputKafka extends Output {
       } catch (InterruptedException e) {
         isKafkaBrokerUp = false;
         String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT";
-        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e,
-            LOG, Level.ERROR);
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e, LOG,
+            Level.ERROR);
       } catch (ExecutionException e) {
         isKafkaBrokerUp = false;
         String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION";
-        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e,
-            LOG, Level.ERROR);
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e, LOG,
+            Level.ERROR);
       } catch (Throwable t) {
         isKafkaBrokerUp = false;
         String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR";
-        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t,
-            LOG, Level.ERROR);
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t, LOG,
+            Level.ERROR);
       }
     }
     return false;
@@ -260,12 +264,12 @@ public class OutputKafka extends Output {
           output.isKafkaBrokerUp = true;
         }
         output.incrementStat(1);
-        output.writeBytesMetric.count += message.length();
+        output.writeBytesMetric.value += message.length();
       } else {
         output.isKafkaBrokerUp = false;
         String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
-        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback",
-            exception, LOG, Level.ERROR);
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, LOG,
+            Level.ERROR);
 
         output.failedMessages.add(this);
       }
@@ -273,9 +277,7 @@ public class OutputKafka extends Output {
   }
 
   @Override
-  public void copyFile(File inputFile, InputMarker inputMarker)
-      throws UnsupportedOperationException {
-    throw new UnsupportedOperationException(
-        "copyFile method is not yet supported for output=kafka");
+  public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
new file mode 100644
index 0000000..2c81c19
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.FilterLogData;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.MurmurHash;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputManager {
+  private static final Logger LOG = Logger.getLogger(OutputManager.class);
+
+  private static final int HASH_SEED = 31174077;
+  private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+  private List<Output> outputs = new ArrayList<Output>();
+
+  private boolean addMessageMD5 = true;
+
+  private static long docCounter = 0;
+  private MetricData messageTruncateMetric = new MetricData(null, false);
+
+  public List<Output> getOutputs() {
+    return outputs;
+  }
+
+  public void add(Output output) {
+    this.outputs.add(output);
+  }
+
+  public void retainUsedOutputs(Collection<Output> usedOutputs) {
+    outputs.retainAll(usedOutputs);
+  }
+
+  public void init() throws Exception {
+    for (Output output : outputs) {
+      output.init();
+    }
+  }
+
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : input.getContextFields().entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null) {
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // TODO: Ideally most of the overrides should be configurable
+
+    if (jsonObj.get("type") == null) {
+      jsonObj.put("type", input.getStringValue("type"));
+    }
+    if (jsonObj.get("path") == null && input.getFilePath() != null) {
+      jsonObj.put("path", input.getFilePath());
+    }
+    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+      jsonObj.put("path", input.getStringValue("path"));
+    }
+    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+      jsonObj.put("host", LogFeederUtil.hostName);
+    }
+    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+      jsonObj.put("ip", LogFeederUtil.ipAddress);
+    }
+    if (jsonObj.get("level") == null) {
+      jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+    }
+    
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      
+      Long eventMD5 = MurmurHash.hash64A(LogFeederUtil.getGson().toJson(jsonObj).getBytes(), HASH_SEED);
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + eventMD5.toString());
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + eventMD5.toString());
+      }
+    }
+
+    jsonObj.put("seq_num", new Long(docCounter++));
+    if (jsonObj.get("id") == null) {
+      jsonObj.put("id", UUID.randomUUID().toString());
+    }
+    if (jsonObj.get("event_count") == null) {
+      jsonObj.put("event_count", new Integer(1));
+    }
+    if (inputMarker.lineNumber > 0) {
+      jsonObj.put("logfile_line_number", new Integer(inputMarker.lineNumber));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
+      if (addMessageMD5) {
+        jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077));
+      }
+    }
+    
+    if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+      for (Output output : input.getOutputList()) {
+        try {
+          output.write(jsonObj, inputMarker);
+        } catch (Exception e) {
+          LOG.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) {
+    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+      messageTruncateMetric.value++;
+      String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
+          ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
+          StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
+      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+      jsonObj.put("log_message", logMessage);
+      List<String> tagsList = (List<String>) jsonObj.get("tags");
+      if (tagsList == null) {
+        tagsList = new ArrayList<String>();
+        jsonObj.put("tags", tagsList);
+      }
+      tagsList.add("error_message_truncated");
+    }
+    return logMessage;
+  }
+
+  public void write(String jsonBlock, InputMarker inputMarker) {
+    if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+      for (Output output : inputMarker.input.getOutputList()) {
+        try {
+          output.write(jsonBlock, inputMarker);
+        } catch (Exception e) {
+          LOG.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  public void copyFile(File inputFile, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+    for (Output output : input.getOutputList()) {
+      try {
+        output.copyFile(inputFile, inputMarker);
+      }catch (Exception e) {
+        LOG.error("Error coyping file . to " + output.getShortDescription(), e);
+      }
+    }
+  }
+
+  public void logStats() {
+    for (Output output : outputs) {
+      output.logStat();
+    }
+    LogFeederUtil.logStatForMetric(messageTruncateMetric, "Stat: Messages Truncated", "");
+  }
+
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    metricsList.add(messageTruncateMetric);
+    for (Output output : outputs) {
+      output.addMetricsContainers(metricsList);
+    }
+  }
+
+  public void close() {
+    LOG.info("Close called for outputs ...");
+    for (Output output : outputs) {
+      try {
+        output.setDrain(true);
+        output.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+    
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    for (int i = 0; i < iterations; i++) {
+      boolean allClosed = true;
+      for (Output output : outputs) {
+        if (!output.isClosed()) {
+          try {
+            allClosed = false;
+            LOG.warn("Waiting for output to close. " + output.getShortDescription() + ", " + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        LOG.info("All outputs are closed. Iterations=" + i);
+        return;
+      }
+    }
+
+    LOG.warn("Some outpus were not closed after " + iterations + "  iterations");
+    for (Output output : outputs) {
+      if (!output.isClosed()) {
+        LOG.warn("Output not closed. Will ignore it." + output.getShortDescription() + ", pendingCound=" + output.getPendingCount());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
deleted file mode 100644
index 0a6b7fa..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-public class OutputMgr {
-  private static final Logger logger = Logger.getLogger(OutputMgr.class);
-
-  private Collection<Output> outputList = new ArrayList<Output>();
-
-  private boolean addMessageMD5 = true;
-
-  private int MAX_OUTPUT_SIZE = 32765; // 32766-1
-  private static long doc_counter = 0;
-  private MetricCount messageTruncateMetric = new MetricCount();
-
-  
-  public Collection<Output> getOutputList() {
-    return outputList;
-  }
-
-  public void setOutputList(Collection<Output> outputList) {
-    this.outputList = outputList;
-  }
-
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.input;
-
-    // Update the block with the context fields
-    for (Map.Entry<String, String> entry : input.getContextFields()
-      .entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null) {
-        jsonObj.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // TODO: Ideally most of the overrides should be configurable
-
-    // Add the input type
-    if (jsonObj.get("type") == null) {
-      jsonObj.put("type", input.getStringValue("type"));
-    }
-    if (jsonObj.get("path") == null && input.getFilePath() != null) {
-      jsonObj.put("path", input.getFilePath());
-    }
-    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
-      jsonObj.put("path", input.getStringValue("path"));
-    }
-
-    // Add host if required
-    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
-      jsonObj.put("host", LogFeederUtil.hostName);
-    }
-    // Add IP if required
-    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
-      jsonObj.put("ip", LogFeederUtil.ipAddress);
-    }
-    
-    //Add level
-    if (jsonObj.get("level") == null) {
-      jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-    }
-    if (input.isUseEventMD5() || input.isGenEventMD5()) {
-      String prefix = "";
-      Object logtimeObj = jsonObj.get("logtime");
-      if (logtimeObj != null) {
-        if (logtimeObj instanceof Date) {
-          prefix = "" + ((Date) logtimeObj).getTime();
-        } else {
-          prefix = logtimeObj.toString();
-        }
-      }
-      Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
-        .toJson(jsonObj));
-      if (input.isGenEventMD5()) {
-        jsonObj.put("event_md5", prefix + eventMD5.toString());
-      }
-      if (input.isUseEventMD5()) {
-        jsonObj.put("id", prefix + eventMD5.toString());
-      }
-    }
-
-    // jsonObj.put("@timestamp", new Date());
-    jsonObj.put("seq_num", new Long(doc_counter++));
-    if (jsonObj.get("id") == null) {
-      jsonObj.put("id", UUID.randomUUID().toString());
-    }
-    if (jsonObj.get("event_count") == null) {
-      jsonObj.put("event_count", new Integer(1));
-    }
-    if (inputMarker.lineNumber > 0) {
-      jsonObj.put("logfile_line_number", new Integer(
-        inputMarker.lineNumber));
-    }
-    if (jsonObj.containsKey("log_message")) {
-      // TODO: Let's check size only for log_message for now
-      String logMessage = (String) jsonObj.get("log_message");
-      if (logMessage != null
-        && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
-        messageTruncateMetric.count++;
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_MESSAGESIZE";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-          "Message is too big. size="
-            + logMessage.getBytes().length + ", input="
-            + input.getShortDescription()
-            + ". Truncating to " + MAX_OUTPUT_SIZE
-            + ", first upto 100 characters="
-            + LogFeederUtil.subString(logMessage, 100),
-          null, logger, Level.WARN);
-        logMessage = new String(logMessage.getBytes(), 0,
-          MAX_OUTPUT_SIZE);
-        jsonObj.put("log_message", logMessage);
-        // Add error tags
-        @SuppressWarnings("unchecked")
-        List<String> tagsList = (List<String>) jsonObj.get("tags");
-        if (tagsList == null) {
-          tagsList = new ArrayList<String>();
-          jsonObj.put("tags", tagsList);
-        }
-        tagsList.add("error_message_truncated");
-
-      }
-      if (addMessageMD5) {
-        jsonObj.put("message_md5",
-          "" + LogFeederUtil.genHash(logMessage));
-      }
-    }
-    //check log is allowed to send output
-    if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
-      for (Output output : input.getOutputList()) {
-        try {
-          output.write(jsonObj, inputMarker);
-        } catch (Exception e) {
-          logger.error("Error writing. to " + output.getShortDescription(), e);
-        }
-      }
-    }
-  }
-
-  public void write(String jsonBlock, InputMarker inputMarker) {
-    //check log is allowed to send output
-    if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
-      for (Output output : inputMarker.input.getOutputList()) {
-        try {
-          output.write(jsonBlock, inputMarker);
-        } catch (Exception e) {
-          logger.error("Error writing. to " + output.getShortDescription(), e);
-        }
-      }
-    }
-  }
-
-  public void close() {
-    logger.info("Close called for outputs ...");
-    for (Output output : outputList) {
-      try {
-        output.setDrain(true);
-        output.close();
-      } catch (Exception e) {
-        // Ignore
-      }
-    }
-    // Need to get this value from property
-    int iterations = 30;
-    int waitTimeMS = 1000;
-    int i;
-    boolean allClosed = true;
-    for (i = 0; i < iterations; i++) {
-      allClosed = true;
-      for (Output output : outputList) {
-        if (!output.isClosed()) {
-          try {
-            allClosed = false;
-            logger.warn("Waiting for output to close. "
-              + output.getShortDescription() + ", "
-              + (iterations - i) + " more seconds");
-            Thread.sleep(waitTimeMS);
-          } catch (Throwable t) {
-            // Ignore
-          }
-        }
-      }
-      if (allClosed) {
-        break;
-      }
-    }
-
-    if (!allClosed) {
-      logger.warn("Some outpus were not closed. Iterations=" + i);
-      for (Output output : outputList) {
-        if (!output.isClosed()) {
-          logger.warn("Output not closed. Will ignore it."
-            + output.getShortDescription() + ", pendingCound="
-            + output.getPendingCount());
-        }
-      }
-    } else {
-      logger.info("All outputs are closed. Iterations=" + i);
-    }
-  }
-
-  public void logStats() {
-    for (Output output : outputList) {
-      output.logStat();
-    }
-    LogFeederUtil.logStatForMetric(messageTruncateMetric,
-      "Stat: Messages Truncated", null);
-  }
-
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    metricsList.add(messageTruncateMetric);
-    for (Output output : outputList) {
-      output.addMetricsContainers(metricsList);
-    }
-  }
-
-  
-  public void copyFile(File inputFile, InputMarker inputMarker) {
-    Input input = inputMarker.input;
-    for (Output output : input.getOutputList()) {
-      try {
-        output.copyFile(inputFile, inputMarker);
-      }catch (Exception e) {
-        logger.error("Error coyping file . to " + output.getShortDescription(),
-            e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index e95f8df..26f1ddb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
@@ -47,10 +48,10 @@ import java.util.Map.Entry;
  * </ul>
  */
 public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+  private static final Logger LOG = Logger.getLogger(OutputS3File.class);
 
   public static final String INPUT_ATTRIBUTE_TYPE = "type";
   public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
-  static private Logger logger = Logger.getLogger(OutputS3File.class);
 
   private LogSpooler logSpooler;
   private S3OutputConfiguration s3OutputConfiguration;
@@ -72,23 +73,21 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
     String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration,
-        S3Util.INSTANCE, false, type);
-    String resolvedPath = s3Uploader.uploadFile(inputFile,
-        inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
+    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
 
     uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
   }
 
-  private void uploadConfig(InputMarker inputMarker, String type,
-                            S3OutputConfiguration s3OutputConfiguration, String resolvedPath) {
+  private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
+      String resolvedPath) {
 
     ArrayList<Map<String, Object>> filters = new ArrayList<>();
     addFilters(filters, inputMarker.input.getFirstFilter());
     Map<String, Object> inputConfig = new HashMap<>();
     inputConfig.putAll(inputMarker.input.getConfigs());
-    String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName()
-        + S3Util.S3_PATH_SEPARATOR + resolvedPath;
+    String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
+        LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
     inputConfig.put("path", s3CompletePath);
 
     ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
@@ -117,17 +116,15 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
     }
   }
 
-  private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix,
-                              S3OutputConfiguration s3OutputConfiguration) {
+  private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
     Gson gson = new GsonBuilder().setPrettyPrinting().create();
     String configJson = gson.toJson(configToWrite);
 
-    String s3ResolvedKey = new S3LogPathResolver().
-        getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster());
+    String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
+        s3OutputConfiguration.getCluster());
 
-    S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(),
-        s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(),
-        s3OutputConfiguration.getS3SecretKey());
+    S3Util.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
+        s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
   }
 
   private String getComponentConfigFileName(String componentName) {
@@ -136,7 +133,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
 
 
   private Map<String, Object> getGlobalConfig() {
-    Map<String, Object> globalConfig = LogFeeder.globalMap;
+    Map<String, Object> globalConfig = LogFeeder.globalConfigs;
     if (globalConfig == null) {
       globalConfig = new HashMap<>();
     }
@@ -173,8 +170,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
       globalConfig.put("copy_file", false);
       globalConfig.put("process_file", true);
       globalConfig.put("tail", false);
-      Map<String, Object> addFields = (Map<String, Object>) globalConfig
-          .get("add_fields");
+      Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields");
       if (addFields == null) {
         addFields = new HashMap<>();
       }
@@ -216,7 +212,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
 
   @VisibleForTesting
   protected S3Uploader createUploader(String logType) {
-    S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType);
+    S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType);
     uploader.startUploaderThread();
     return uploader;
   }
@@ -224,8 +220,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   @VisibleForTesting
   protected LogSpooler createSpooler(String filePath) {
     String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
-    logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s",
-        spoolDirectory, filePath));
+    LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
     return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
         s3OutputConfiguration.getRolloverTimeThresholdSecs());
   }
@@ -244,7 +239,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
     long currentSize = spoolFile.length();
     boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
     if (result) {
-      logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
+      LOG.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
           s3OutputConfiguration.getRolloverSizeThresholdBytes()));
     }
     return result;

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index cd9ce4d..47f139d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -34,7 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
@@ -76,7 +77,17 @@ public class OutputSolr extends Output {
 
   private BlockingQueue<OutputData> outgoingBuffer = null;
   private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
-
+  
+  @Override
+  protected String getStatMetricName() {
+    return "output.solr.write_logs";
+  }
+  
+  @Override
+  protected String getWriteBytesMetricName() {
+    return "output.solr.write_bytes";
+  }
+  
   @Override
   public void init() throws Exception {
     super.init();
@@ -87,9 +98,6 @@ public class OutputSolr extends Output {
   }
 
   private void initParams() throws Exception {
-    statMetric.metricsName = "output.solr.write_logs";
-    writeBytesMetric.metricsName = "output.solr.write_bytes";
-
     splitMode = getStringValue("splits_interval_mins", "none");
     if (!splitMode.equalsIgnoreCase("none")) {
       splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
@@ -204,10 +212,8 @@ public class OutputSolr extends Output {
         LOG.info("Ping to Solr server is successful for worker=" + count);
       } else {
         LOG.warn(
-            String.format(
-                "Ping to Solr server failed. It would check again. worker=%d, "
-                    + "solrUrl=%s, zkConnectString=%s, collection=%s, response=%s",
-                count, solrUrl, zkConnectString, collection, response));
+            String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " +
+                "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response));
       }
     } catch (Throwable t) {
       LOG.warn(String.format(
@@ -223,7 +229,7 @@ public class OutputSolr extends Output {
     
     while (true) {
       LOG.info("Checking if config is available");
-      if (FetchConfigFromSolr.isFilterAvailable()) {
+      if (LogConfigHandler.isFilterAvailable()) {
         LOG.info("Config is available");
         return;
       }
@@ -256,7 +262,7 @@ public class OutputSolr extends Output {
 
   private void useActualDateIfNeeded(Map<String, Object> jsonObj) {
     if (skipLogtime) {
-      jsonObj.put("logtime", LogFeederUtil.getActualDateStr());
+      jsonObj.put("logtime", DateUtil.getActualDateStr());
     }
   }
 
@@ -324,7 +330,7 @@ public class OutputSolr extends Output {
 
     private final SolrClient solrClient;
     private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
-    private final Map<String, InputMarker> latestInputMarkerList = new HashMap<>();
+    private final Map<String, InputMarker> latestInputMarkers = new HashMap<>();
 
     private long localBufferBytesSize = 0;
 
@@ -352,17 +358,16 @@ public class OutputSolr extends Output {
             }
           }
 
-          if (localBuffer.size() > 0 && ((outputData == null && isDrain())
-              || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
+          if (localBuffer.size() > 0 && ((outputData == null && isDrain()) ||
+              (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
             boolean response = sendToSolr(outputData);
             if( isDrain() && !response) {
               //Since sending to Solr response failed and it is in draining mode, let's break;
-              LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" 
-                  + getShortDescription());
+              LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" + getShortDescription());
               break;
             }
           }
-          if( localBuffer.size() == 0 ) {
+          if (localBuffer.size() == 0) {
             //If localBuffer is empty, then reset the timer
             lastDispatchTime = currTimeMS;
           }
@@ -403,8 +408,7 @@ public class OutputSolr extends Output {
         } catch (IOException | SolrException exception) {
           // Transient error, lets block till it is available
           try {
-            LOG.warn("Solr is not reachable. Going to retry after "
-                + RETRY_INTERVAL + " seconds. " + "output="
+            LOG.warn("Solr is not reachable. Going to retry after " + RETRY_INTERVAL + " seconds. " + "output="
                 + getShortDescription(), exception);
             Thread.sleep(RETRY_INTERVAL * 1000);
           } catch (Throwable t) {
@@ -414,8 +418,8 @@ public class OutputSolr extends Output {
           // Something unknown happened. Let's not block because of this error. 
           // Clear the buffer
           String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
-          LogFeederUtil.logErrorMessageByInterval(logMessageKey,
-              "Error sending log message to server. Dropping logs", serverException, LOG, Level.ERROR);
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending log message to server. Dropping logs",
+              serverException, LOG, Level.ERROR);
           resetLocalBuffer();
           break;
         }
@@ -447,7 +451,7 @@ public class OutputSolr extends Output {
               Level.ERROR);
         }
       }
-      latestInputMarkerList.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+      latestInputMarkers.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
       localBuffer.add(document);
     }
 
@@ -479,9 +483,9 @@ public class OutputSolr extends Output {
         LogFeederUtil.logErrorMessageByInterval(logMessageKey,
             String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, LOG, Level.ERROR);
       }
-      statMetric.count += localBuffer.size();
-      writeBytesMetric.count += localBufferBytesSize;
-      for (InputMarker inputMarker : latestInputMarkerList.values()) {
+      statMetric.value += localBuffer.size();
+      writeBytesMetric.value += localBufferBytesSize;
+      for (InputMarker inputMarker : latestInputMarkers.values()) {
         inputMarker.input.checkIn(inputMarker);
       }
     }
@@ -499,7 +503,7 @@ public class OutputSolr extends Output {
     public void resetLocalBuffer() {
       localBuffer.clear();
       localBufferBytesSize = 0;
-      latestInputMarkerList.clear();
+      latestInputMarkers.clear();
     }
 
     public boolean isDone() {
@@ -512,9 +516,7 @@ public class OutputSolr extends Output {
   }
 
   @Override
-  public void copyFile(File inputFile, InputMarker inputMarker)
-      throws UnsupportedOperationException {
-    throw new UnsupportedOperationException(
-        "copyFile method is not yet supported for output=solr");
+  public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("copyFile method is not yet supported for output=solr");
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
index 58282e0..8c544cf 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,9 +18,9 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
 
 import java.util.HashMap;
 
@@ -40,7 +40,7 @@ public class S3LogPathResolver {
   public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
     HashMap<String, String> contextParam = buildContextParam(cluster);
     String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
-    return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix;
+    return resolvedKeyPrefix + LogFeederConstants.S3_PATH_SEPARATOR + keySuffix;
   }
 
   private HashMap<String, String> buildContextParam(String cluster) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index 485b0d4..e5974c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -103,8 +103,7 @@ public class S3OutputConfiguration {
     };
 
     for (int i = 0; i < longValuedKeysToCopy.length; i++) {
-      configs.put(longValuedKeysToCopy[i],
-          configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+      configs.put(longValuedKeysToCopy[i], configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
     }
 
     configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index fd59c51..e95a663 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,8 +18,12 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.CompressionUtil;
 import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.log4j.Logger;
@@ -39,20 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * {@link org.apache.ambari.logfeeder.input.InputFile}.
  */
 public class S3Uploader implements Runnable {
+  private static final Logger LOG = Logger.getLogger(S3Uploader.class);
+  
   public static final String POISON_PILL = "POISON-PILL";
-  private static Logger logger = Logger.getLogger(S3Uploader.class);
 
   private final S3OutputConfiguration s3OutputConfiguration;
-  private final S3Util s3UtilInstance;
   private final boolean deleteOnEnd;
-  private String logType;
+  private final String logType;
   private final BlockingQueue<String> fileContextsToUpload;
-  private AtomicBoolean stopRunningThread = new AtomicBoolean(false);
+  private final AtomicBoolean stopRunningThread = new AtomicBoolean(false);
 
-  public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd,
-                    String logType) {
+  public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean deleteOnEnd, String logType) {
     this.s3OutputConfiguration = s3OutputConfiguration;
-    this.s3UtilInstance = s3UtilInstance;
     this.deleteOnEnd = deleteOnEnd;
     this.logType = logType;
     this.fileContextsToUpload = new LinkedBlockingQueue<>();
@@ -81,7 +83,7 @@ public class S3Uploader implements Runnable {
     stopRunningThread.set(true);
     boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
     if (!offerStatus) {
-      logger.warn("Could not add poison pill to interrupt uploader thread.");
+      LOG.warn("Could not add poison pill to interrupt uploader thread.");
     }
   }
 
@@ -92,7 +94,7 @@ public class S3Uploader implements Runnable {
   void addFileForUpload(String fileToUpload) {
     boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
     if (!offerStatus) {
-      logger.error("Could not add file " + fileToUpload + " for upload.");
+      LOG.error("Could not add file " + fileToUpload + " for upload.");
     }
   }
 
@@ -102,12 +104,12 @@ public class S3Uploader implements Runnable {
       try {
         String fileNameToUpload = fileContextsToUpload.take();
         if (POISON_PILL.equals(fileNameToUpload)) {
-          logger.warn("Found poison pill while waiting for files to upload, exiting");
+          LOG.warn("Found poison pill while waiting for files to upload, exiting");
           return;
         }
         uploadFile(new File(fileNameToUpload), logType);
       } catch (InterruptedException e) {
-        logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
+        LOG.error("Interrupted while waiting for elements from fileContextsToUpload", e);
         return;
       }
     }
@@ -130,34 +132,44 @@ public class S3Uploader implements Runnable {
     String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
 
     String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
-    String s3Path = new S3LogPathResolver().
-        getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType,
-            keySuffix, s3OutputConfiguration.getCluster());
-    logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s",
-        s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
+    String s3Path = new S3LogPathResolver().getResolvedPath(
+        s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + logType, keySuffix,
+        s3OutputConfiguration.getCluster());
+    LOG.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
     File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
 
-    logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
-    s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey,
-        s3SecretKey);
+    LOG.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
+    uploadFileToS3(bucketName, s3Path, sourceFile, s3AccessKey, s3SecretKey);
 
     // delete local compressed file
     sourceFile.delete();
     if (deleteOnEnd) {
-      logger.info("Deleting input file as required");
+      LOG.info("Deleting input file as required");
       if (!fileToUpload.delete()) {
-        logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
+        LOG.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
       }
     }
     return s3Path;
   }
 
   @VisibleForTesting
+  protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+    TransferManager transferManager = S3Util.getTransferManager(accessKey, secretKey);
+    try {
+      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+      upload.waitForUploadResult();
+    } catch (AmazonClientException | InterruptedException e) {
+      LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), e);
+    } finally {
+      S3Util.shutdownTransferManager(transferManager);
+    }
+  }
+
+  @VisibleForTesting
   protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-    File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_"
-        + new Date().getTime() + "." + compressionAlgo);
-    outputFile = CompressionUtil.compressFile(fileToUpload, outputFile,
-        compressionAlgo);
+    File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() +
+        "." + compressionAlgo);
+    outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, compressionAlgo);
     return outputFile;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index fb263ba..1f13357 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,8 +39,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * {@link RolloverHandler} to trigger the handling of the rolled over file.
  */
 public class LogSpooler {
+  
+  private static final Logger LOG = Logger.getLogger(LogSpooler.class);
   public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
-  static private Logger logger = Logger.getLogger(LogSpooler.class);
   static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
 
   private String spoolDirectory;
@@ -98,7 +99,7 @@ public class LogSpooler {
   private void initializeSpoolDirectory() {
     File spoolDir = new File(spoolDirectory);
     if (!spoolDir.exists()) {
-      logger.info("Creating spool directory: " + spoolDir);
+      LOG.info("Creating spool directory: " + spoolDir);
       boolean result = spoolDir.mkdirs();
       if (!result) {
         throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
@@ -116,7 +117,7 @@ public class LogSpooler {
           + ", error message: " + e.getLocalizedMessage(), e);
     }
     currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
-    logger.info("Initialized spool file at path: " + currentSpoolFile);
+    LOG.info("Initialized spool file at path: " + currentSpoolFile);
   }
 
   @VisibleForTesting
@@ -141,7 +142,7 @@ public class LogSpooler {
     currentSpoolBufferedWriter.println(logEvent);
     currentSpoolerContext.logEventSpooled();
     if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
-      logger.info("Trying to rollover based on rollover condition");
+      LOG.info("Trying to rollover based on rollover condition");
       tryRollover();
     }
   }
@@ -154,19 +155,19 @@ public class LogSpooler {
    * rolled over file.
    */
   public void rollover() {
-    logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
+    LOG.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
     currentSpoolBufferedWriter.flush();
     if (currentSpoolFile.length()==0) {
-      logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
+      LOG.info("No data in file " + currentSpoolFile + ", not doing rollover");
     } else {
       currentSpoolBufferedWriter.close();
       rolloverHandler.handleRollover(currentSpoolFile);
-      logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+      LOG.info("Invoked rollover handler with file: " + currentSpoolFile);
       initializeSpoolState();
     }
     boolean status = rolloverInProgress.compareAndSet(true, false);
     if (!status) {
-      logger.error("Should have reset rollover flag!!");
+      LOG.error("Should have reset rollover flag!!");
     }
   }
 
@@ -174,7 +175,7 @@ public class LogSpooler {
     if (rolloverInProgress.compareAndSet(false, true)) {
       rollover();
     } else {
-      logger.warn("Ignoring rollover call as rollover already in progress for file " +
+      LOG.warn("Ignoring rollover call as rollover already in progress for file " +
           currentSpoolFile);
     }
   }
@@ -197,7 +198,7 @@ public class LogSpooler {
   private class LogSpoolerRolloverTimerTask extends TimerTask {
     @Override
     public void run() {
-      logger.info("Trying rollover based on time");
+      LOG.info("Trying rollover based on time");
       tryRollover();
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
index 084d6a2..616300f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
index 1e12fb7..14bb139 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
index 8279645..48ace11 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
index 11308e4..2ec2708 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
index 15f7594..f814a92 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
@@ -20,62 +20,20 @@ package org.apache.ambari.logfeeder.util;
 
 import org.apache.log4j.Logger;
 
-import com.amazonaws.AmazonServiceException;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
 
-public enum AWSUtil {
-  INSTANCE;
+public class AWSUtil {
   private static final Logger LOG = Logger.getLogger(AWSUtil.class);
 
-  public String getAwsUserName(String accessKey, String secretKey) {
-    String username = null;
-    AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
-    AmazonIdentityManagementClient amazonIdentityManagementClient;
-    if (awsCredentials != null) {
-      amazonIdentityManagementClient = new AmazonIdentityManagementClient(
-          awsCredentials);
-    } else {
-      // create default client
-      amazonIdentityManagementClient = new AmazonIdentityManagementClient();
-    }
-    try {
-      username = amazonIdentityManagementClient.getUser().getUser()
-          .getUserName();
-    } catch (AmazonServiceException e) {
-      if (e.getErrorCode().compareTo("AccessDenied") == 0) {
-        String arn = null;
-        String msg = e.getMessage();
-        int arnIdx = msg.indexOf("arn:aws");
-        if (arnIdx != -1) {
-          int arnSpace = msg.indexOf(" ", arnIdx);
-          // should be similar to "arn:aws:iam::111111111111:user/username"
-          arn = msg.substring(arnIdx, arnSpace);
-        }
-        if (arn != null) {
-          String[] arnParts = arn.split(":");
-          if (arnParts != null && arnParts.length > 5) {
-            username = arnParts[5];
-            if (username != null) {
-              username = username.replace("user/", "");
-            }
-          }
-        }
-      }
-    } catch (Exception exception) {
-      LOG.error(
-          "Error in getting username :" + exception.getLocalizedMessage(),
-          exception.getCause());
-    }
-    return username;
+  private AWSUtil() {
+    throw new UnsupportedOperationException();
   }
 
-  public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+  public static AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
     if (accessKey != null && secretKey != null) {
       LOG.debug("Creating aws client as per new accesskey and secretkey");
-      AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
-          secretKey);
+      AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
       return awsCredentials;
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
index a92ba29..5049b62 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
@@ -21,69 +21,90 @@ package org.apache.ambari.logfeeder.util;
 import java.io.File;
 import java.util.HashMap;
 
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 public class AliasUtil {
 
-  private static Logger logger = Logger.getLogger(AliasUtil.class);
+  private static final Logger LOG = Logger.getLogger(AliasUtil.class);
 
-  private static AliasUtil instance = null;
+  private static final String ALIAS_CONFIG_JSON = "alias_config.json";
+  private static HashMap<String, Object> aliasMap = null;
 
-  private static String aliasConfigJson = "alias_config.json";
-
-  private HashMap<String, Object> aliasMap = null;
-
-  public static enum ALIAS_TYPE {
-    INPUT, FILTER, MAPPER, OUTPUT
+  static {
+    File jsonFile = FileUtil.getFileFromClasspath(ALIAS_CONFIG_JSON);
+    if (jsonFile != null) {
+      aliasMap = FileUtil.readJsonFromFile(jsonFile);
+    }
   }
 
-  public static enum ALIAS_PARAM {
-    KLASS
+  public static enum AliasType {
+    INPUT, FILTER, MAPPER, OUTPUT
   }
 
   private AliasUtil() {
-    init();
+    throw new UnsupportedOperationException();
   }
 
-  public static AliasUtil getInstance() {
-    if (instance == null) {
-      synchronized (AliasUtil.class) {
-        if (instance == null) {
-          instance = new AliasUtil();
-        }
-      }
+  public static Object getClassInstance(String key, AliasType aliasType) {
+    String classFullName = getClassFullName(key, aliasType);
+    
+    Object instance = null;
+    try {
+      instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+    } catch (Exception exception) {
+      LOG.error("Unsupported class = " + classFullName, exception.getCause());
     }
-    return instance;
-  }
 
-  /**
-   */
-  private void init() {
-    File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson);
-    if (jsonFile != null) {
-      this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile);
+    if (instance != null) {
+      boolean isValid = false;
+      switch (aliasType) {
+        case FILTER:
+          isValid = Filter.class.isAssignableFrom(instance.getClass());
+          break;
+        case INPUT:
+          isValid = Input.class.isAssignableFrom(instance.getClass());
+          break;
+        case OUTPUT:
+          isValid = Output.class.isAssignableFrom(instance.getClass());
+          break;
+        case MAPPER:
+          isValid = Mapper.class.isAssignableFrom(instance.getClass());
+          break;
+        default:
+          LOG.warn("Unhandled aliasType: " + aliasType);
+          isValid = true;
+      }
+      if (!isValid) {
+        LOG.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+      }
     }
-
+    return instance;
   }
 
-
-  public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) {
-    String result = key;// key as a default value;
+  private static String getClassFullName(String key, AliasType aliastype) {
+    String className = null;// key as a default value;
+    
     HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
-    String value = aliasInfo.get(aliasParam.name().toLowerCase());
-    if (value != null && !value.isEmpty()) {
-      result = value;
-      logger.debug("Alias found for key :" + key + ",  param :" + aliasParam.name().toLowerCase() + ", value :"
-        + value + " aliastype:" + aliastype.name());
+    String value = aliasInfo.get("klass");
+    if (!StringUtils.isEmpty(value)) {
+      className = value;
+      LOG.debug("Class name found for key :" + key + ", class name :" + className + " aliastype:" + aliastype.name());
     } else {
-      logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase());
+      LOG.debug("Class name not found for key :" + key + " aliastype:" + aliastype.name());
     }
-    return result;
+    
+    return className;
   }
 
   @SuppressWarnings("unchecked")
-  private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) {
-    HashMap<String, String> aliasInfo = null;
+  private static HashMap<String, String> getAliasInfo(String key, AliasType aliastype) {
+    HashMap<String, String> aliasInfo = new HashMap<String, String>();
+    
     if (aliasMap != null) {
       String typeKey = aliastype.name().toLowerCase();
       HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
@@ -91,9 +112,7 @@ public class AliasUtil {
         aliasInfo = (HashMap<String, String>) typeJson.get(key);
       }
     }
-    if (aliasInfo == null) {
-      aliasInfo = new HashMap<String, String>();
-    }
+    
     return aliasInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
index c2addbd..c460ab3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
@@ -37,25 +37,20 @@ public class CompressionUtil {
     FileInputStream ios = null;
     try {
       if (!inputFile.exists()) {
-        throw new IllegalArgumentException("Input File:"
-            + inputFile.getAbsolutePath() + " is not exist.");
+        throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is not exist.");
       }
       if (inputFile.isDirectory()) {
-        throw new IllegalArgumentException("Input File:"
-            + inputFile.getAbsolutePath() + " is a directory.");
+        throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is a directory.");
       }
       File parent = outputFile.getParentFile();
       if (parent != null && !parent.exists()) {
         boolean isParentCreated = parent.mkdirs();
         if (!isParentCreated) {
-          throw new IllegalAccessException(
-              "User does not have permission to create parent directory :"
-                  + parent.getAbsolutePath());
+          throw new IllegalAccessException( "User does not have permission to create parent directory :" + parent.getAbsolutePath());
         }
       }
-      final OutputStream out = new FileOutputStream(outputFile);
-      cos = new CompressorStreamFactory().createCompressorOutputStream(
-          algoName, out);
+      OutputStream out = new FileOutputStream(outputFile);
+      cos = new CompressorStreamFactory().createCompressorOutputStream(algoName, out);
       ios = new FileInputStream(inputFile);
       IOUtils.copy(ios, cos);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
index 2ca9353..6321e17 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
@@ -20,12 +20,17 @@ package org.apache.ambari.logfeeder.util;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.TimeZone;
 
 import org.apache.log4j.Logger;
 
 public class DateUtil {
-  private static final Logger logger = Logger.getLogger(DateUtil.class);
-
+  private static final Logger LOG = Logger.getLogger(DateUtil.class);
+  
+  private DateUtil() {
+    throw new UnsupportedOperationException();
+  }
+  
   public static String dateToString(Date date, String dateFormat) {
     if (date == null || dateFormat == null || dateFormat.isEmpty()) {
       return "";
@@ -34,8 +39,36 @@ public class DateUtil {
       SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
       return formatter.format(date).toString();
     } catch (Exception e) {
-      logger.error("Error in coverting dateToString  format :" + dateFormat, e);
+      LOG.error("Error in coverting dateToString  format :" + dateFormat, e);
     }
     return "";
   }
+
+  private final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+  private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
+      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+      return sdf;
+    }
+  };
+
+  public static String getDate(String timeStampStr) {
+    try {
+      return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
+    } catch (Exception ex) {
+      LOG.error(ex);
+      return null;
+    }
+  }
+
+  public static String getActualDateStr() {
+    try {
+      return dateFormatter.get().format(new Date());
+    } catch (Exception ex) {
+      LOG.error(ex);
+      return null;
+    }
+  }
 }