You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/15 21:17:46 UTC

[1/2] ambari git commit: AMBARI-15887. Added Logfeeder unit tests (Miklos Gergely via oleewere)

Repository: ambari
Updated Branches:
  refs/heads/trunk a55c055a8 -> e5c13a3ad


http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
new file mode 100644
index 0000000..a7db3f8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputKafka.KafkaCallBack;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class OutputKafkaTest {
+  private static final Logger LOG = Logger.getLogger(OutputKafkaTest.class);
+
+  private static final String TEST_TOPIC = "test topic";
+
+  private OutputKafka outputKafka;
+
+  @SuppressWarnings("unchecked")
+  private KafkaProducer<String, String> mockKafkaProducer = EasyMock.strictMock(KafkaProducer.class);
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Before
+  public void init() {
+    outputKafka = new OutputKafka() {
+      @Override
+      protected KafkaProducer<String, String> creteKafkaProducer(Properties props) {
+        return mockKafkaProducer;
+      }
+    };
+  }
+
+  @Test
+  public void testOutputKafka_uploadData() throws Exception {
+    LOG.info("testOutputKafka_uploadData()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("broker_list", "some broker list");
+    config.put("topic", TEST_TOPIC);
+
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+
+    @SuppressWarnings("unchecked")
+    Future<RecordMetadata> mockFuture = EasyMock.mock(Future.class);
+
+    EasyMock.expect(mockKafkaProducer.send(new ProducerRecord<String, String>(TEST_TOPIC, "value0")))
+        .andReturn(mockFuture);
+    EasyMock.expect(mockFuture.get()).andReturn(null);
+
+    for (int i = 1; i < 10; i++)
+      EasyMock.expect(mockKafkaProducer.send(EasyMock.eq(new ProducerRecord<String, String>(TEST_TOPIC, "value" + i)),
+          EasyMock.anyObject(KafkaCallBack.class))).andReturn(null);
+
+    EasyMock.replay(mockKafkaProducer);
+
+    for (int i = 0; i < 10; i++) {
+      InputMarker inputMarker = new InputMarker();
+      inputMarker.input = EasyMock.mock(Input.class);
+      outputKafka.write("value" + i, inputMarker);
+    }
+
+    EasyMock.verify(mockKafkaProducer);
+  }
+
+  @Test
+  public void testOutputKafka_noBrokerList() throws Exception {
+    LOG.info("testOutputKafka_noBrokerList()");
+
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For kafka output, bootstrap broker_list is needed");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("topic", TEST_TOPIC);
+
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+  }
+
+  @Test
+  public void testOutputKafka_noTopic() throws Exception {
+    LOG.info("testOutputKafka_noBrokerList()");
+
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For kafka output, topic is needed");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("broker_list", "some broker list");
+
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+  }
+
+  @After
+  public void cleanUp() {
+    EasyMock.reset(mockKafkaProducer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
new file mode 100644
index 0000000..afbccca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class OutputSolrTest {
+  private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
+
+  private OutputSolr outputSolr;
+  private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Before
+  public void init() throws Exception {
+    outputSolr = new OutputSolr() {
+      @Override
+      SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
+        return new CloudSolrClient(null) {
+          private static final long serialVersionUID = 1L;
+
+          @Override
+          public UpdateResponse add(Collection<SolrInputDocument> docs) {
+            for (SolrInputDocument doc : docs) {
+              receivedDocs.put((Integer) doc.getField("id").getValue(), doc);
+            }
+
+            UpdateResponse response = new UpdateResponse();
+            response.setResponse(new NamedList<Object>());
+            return response;
+          }
+        };
+      }
+    };
+  }
+
+  @Test
+  public void testOutputToSolr_uploadData() throws Exception {
+    LOG.info("testOutputToSolr_uploadData()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("url", "some url");
+    config.put("workers", "3");
+
+    outputSolr.loadConfig(config);
+    outputSolr.init();
+
+    Map<Integer, SolrInputDocument> expectedDocs = new HashMap<>();
+
+    int count = 0;
+    for (int i = 0; i < 10; i++) {
+      Map<String, Object> jsonObj = new HashMap<>();
+      for (int j = 0; j < 3; j++)
+        jsonObj.put("name" + ++count, "value" + ++count);
+      jsonObj.put("id", ++count);
+
+      InputMarker inputMarker = new InputMarker();
+      inputMarker.input = EasyMock.mock(Input.class);
+      outputSolr.write(jsonObj, inputMarker);
+
+      SolrInputDocument doc = new SolrInputDocument();
+      for (Map.Entry<String, Object> e : jsonObj.entrySet())
+        doc.addField(e.getKey(), e.getValue());
+
+      expectedDocs.put(count, doc);
+    }
+
+    Thread.sleep(100);
+    while (outputSolr.getPendingCount() > 0)
+      Thread.sleep(100);
+
+    int waitToFinish = 0;
+    if (receivedDocs.size() < 10 && waitToFinish < 10) {
+      Thread.sleep(100);
+      waitToFinish++;
+    }
+
+    Set<Integer> ids = new HashSet<>();
+    ids.addAll(receivedDocs.keySet());
+    ids.addAll(expectedDocs.keySet());
+    for (Integer id : ids) {
+      SolrInputDocument receivedDoc = receivedDocs.get(id);
+      SolrInputDocument expectedDoc = expectedDocs.get(id);
+
+      assertNotNull("No document received for id: " + id, receivedDoc);
+      assertNotNull("No document expected for id: " + id, expectedDoc);
+
+      Set<String> fieldNames = new HashSet<>();
+      fieldNames.addAll(receivedDoc.getFieldNames());
+      fieldNames.addAll(expectedDoc.getFieldNames());
+
+      for (String fieldName : fieldNames) {
+        Object receivedValue = receivedDoc.getFieldValue(fieldName);
+        Object expectedValue = expectedDoc.getFieldValue(fieldName);
+
+        assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
+        assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
+
+        assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue,
+            expectedValue);
+      }
+    }
+  }
+
+  @Test
+  public void testOutputToSolr_noUrlOrZKHost() throws Exception {
+    LOG.info("testOutputToSolr_noUrlOrZKHost()");
+
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For solr output, either url or zk_hosts property need to be set");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("workers", "3");
+
+    outputSolr.loadConfig(config);
+    outputSolr.init();
+  }
+
+  @After
+  public void cleanUp() {
+    receivedDocs.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
new file mode 100644
index 0000000..e641018
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" />
+      <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> -->
+    </layout>
+  </appender>
+
+  <!-- Logs to suppress BEGIN -->
+  <category name="org.apache.solr.common.cloud.ZkStateReader" additivity="false">
+    <priority value="error" />
+    <appender-ref ref="console" />
+  </category>
+
+  <category name="apache.solr.client.solrj.impl.CloudSolrClient" additivity="false">
+    <priority value="fatal" />
+    <appender-ref ref="console" />
+  </category>
+  <!-- Logs to suppress END -->
+
+  <category name="org.apache.ambari.logfeeder" additivity="false">
+    <priority value="info" />
+    <appender-ref ref="console" /> 
+    <!-- <appender-ref ref="daily_rolling_file" /> -->
+  </category>
+
+  <root>
+    <priority value="warn" />
+    <!-- <appender-ref ref="console" /> -->
+    <!-- <appender-ref ref="daily_rolling_file" /> -->
+  </root>
+ 
+</log4j:configuration>  


[2/2] ambari git commit: AMBARI-15887. Added Logfeeder unit tests (Miklos Gergely via oleewere)

Posted by ol...@apache.org.
AMBARI-15887. Added Logfeeder unit tests (Miklos Gergely via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5c13a3a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5c13a3a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5c13a3a

Branch: refs/heads/trunk
Commit: e5c13a3ada85ffd0ffc8d05171c934eabe6243d8
Parents: a55c055
Author: miki.gergely <mg...@hortonworks.com>
Authored: Fri Apr 15 18:14:53 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Fri Apr 15 21:12:13 2016 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-appender/pom.xml           |   3 +-
 .../ambari-logsearch-logfeeder/pom.xml          |  10 +-
 .../ambari/logfeeder/output/OutputKafka.java    | 283 +++++-----
 .../ambari/logfeeder/output/OutputSolr.java     | 566 +++++++++----------
 .../ambari/logfeeder/filter/FilterGrokTest.java | 157 +++++
 .../logfeeder/filter/FilterKeyValueTest.java    | 125 ++++
 .../logfeeder/filter/JSONFilterCodeTest.java    | 124 ++++
 .../ambari/logfeeder/input/InputFileTest.java   | 195 +++++++
 .../ambari/logfeeder/mapper/MapperDateTest.java | 145 +++++
 .../logfeeder/mapper/MapperFieldNameTest.java   |  71 +++
 .../logfeeder/mapper/MapperFieldValueTest.java  |  90 +++
 .../logfeeder/output/OutputKafkaTest.java       | 128 +++++
 .../ambari/logfeeder/output/OutputSolrTest.java | 165 ++++++
 .../src/test/resources/log4j.xml                |  53 ++
 14 files changed, 1657 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-appender/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-appender/pom.xml b/ambari-logsearch/ambari-logsearch-appender/pom.xml
index 85852f6..39f250a 100644
--- a/ambari-logsearch/ambari-logsearch-appender/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-appender/pom.xml
@@ -77,8 +77,7 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>3.8.1</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index a1443c9..0888010 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -41,7 +41,12 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.4</version>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -74,7 +79,6 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
-
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
@@ -105,8 +109,8 @@
       <artifactId>jackson-xc</artifactId>
       <version>1.9.13</version>
     </dependency>
-
   </dependencies>
+
   <build>
     <finalName>LogFeeder</finalName>
     <pluginManagement>

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index c594dd4..cd4f951 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,15 +19,12 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedTransferQueue;
 
 import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.Callback;
@@ -39,58 +36,51 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 public class OutputKafka extends Output {
-  static private Logger logger = Logger.getLogger(OutputKafka.class);
+  private static final Logger LOG = Logger.getLogger(OutputKafka.class);
 
-  String brokerList = null;
-  String topic = null;
-  boolean isAsync = true;
-  long messageCount = 0;
-  int batchSize = 5000;
-  int lingerMS = 1000;
+  private static final int FAILED_RETRY_INTERVAL = 30;
+  private static final int CATCHUP_RETRY_INTERVAL = 5;
+
+  private static final int DEFAULT_BATCH_SIZE = 5000;
+  private static final int DEFAULT_LINGER_MS = 1000;
+
+  private String topic = null;
+  private boolean isAsync = true;
+  private long messageCount = 0;
 
   private KafkaProducer<String, String> producer = null;
-  BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
+  private BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
 
   // Let's start with the assumption Kafka is down
-  boolean isKafkaBrokerUp = false;
-
-  static final int FAILED_RETRY_INTERVAL = 30;
-  static final int CATCHUP_RETRY_INTERVAL = 5;
+  private boolean isKafkaBrokerUp = false;
 
   @Override
   public void init() throws Exception {
     super.init();
-    statMetric.metricsName = "output.kafka.write_logs";
-    writeBytesMetric.metricsName = "output.kafka.write_bytes";
+    Properties props = initProperties();
 
-    brokerList = getStringValue("broker_list");
-    topic = getStringValue("topic");
-    isAsync = getBooleanValue("is_async", true);
-    batchSize = getIntValue("batch_size", batchSize);
-    lingerMS = getIntValue("linger_ms", lingerMS);
+    producer = creteKafkaProducer(props);
+    createKafkaRetryThread();
+  }
 
-    Map<String, Object> kafkaCustomProperties = new HashMap<String, Object>();
-    // Get all kafka custom properties
-    for (String key : configs.keySet()) {
-      if (key.startsWith("kafka.")) {
-        Object value = configs.get(key);
-        if (value == null || value.toString().length() == 0) {
-          continue;
-        }
-        String kafkaKey = key.substring("kafka.".length());
-        kafkaCustomProperties.put(kafkaKey, value);
-      }
-    }
+  private Properties initProperties() throws Exception {
+    statMetric.metricsName = "output.kafka.write_logs";
+    writeBytesMetric.metricsName = "output.kafka.write_bytes";
 
+    String brokerList = getStringValue("broker_list");
     if (StringUtils.isEmpty(brokerList)) {
-      throw new Exception(
-        "For kafka output, bootstrap broker_list is needed");
+      throw new Exception("For kafka output, bootstrap broker_list is needed");
     }
 
+    topic = getStringValue("topic");
     if (StringUtils.isEmpty(topic)) {
       throw new Exception("For kafka output, topic is needed");
     }
 
+    isAsync = getBooleanValue("is_async", true);
+    int batchSize = getIntValue("batch_size", DEFAULT_BATCH_SIZE);
+    int lingerMS = getIntValue("linger_ms", DEFAULT_LINGER_MS);
+
     Properties props = new Properties();
     // 0.9.0
     props.put("bootstrap.servers", brokerList);
@@ -101,50 +91,56 @@ public class OutputKafka extends Output {
     // props.put("retries", "3");
     props.put("batch.size", batchSize);
     props.put("linger.ms", lingerMS);
+    // props.put("metadata.broker.list", brokerList);
 
-    for (String kafkaKey : kafkaCustomProperties.keySet()) {
-      logger.info("Adding custom Kafka property. " + kafkaKey + "="
-        + kafkaCustomProperties.get(kafkaKey));
-      props.put(kafkaKey, kafkaCustomProperties.get(kafkaKey));
+    // Get all kafka custom properties
+    for (String key : configs.keySet()) {
+      if (key.startsWith("kafka.")) {
+        Object value = configs.get(key);
+        if (value == null || value.toString().length() == 0) {
+          continue;
+        }
+        String kafkaKey = key.substring("kafka.".length());
+        LOG.info("Adding custom Kafka property. " + kafkaKey + "=" + value);
+        props.put(kafkaKey, value);
+      }
     }
 
-    // props.put("metadata.broker.list", brokerList);
+    return props;
+  }
+
+  protected KafkaProducer<String, String> creteKafkaProducer(Properties props) {
+    return new KafkaProducer<String, String>(props);
+  }
 
-    producer = new KafkaProducer<String, String>(props);
+  private void createKafkaRetryThread() {
     Thread retryThread = new Thread("kafka-writer-retry,topic=" + topic) {
       @Override
       public void run() {
         KafkaCallBack kafkaCallBack = null;
-        logger.info("Started thread to monitor failed messsages. "
-          + getShortDescription());
+        LOG.info("Started thread to monitor failed messsages. " + getShortDescription());
         while (true) {
           try {
             if (kafkaCallBack == null) {
               kafkaCallBack = failedMessages.take();
             }
-            if (publishMessage(kafkaCallBack.message,
-              kafkaCallBack.inputMarker)) {
-              // logger.info("Sent message. count="
-              // + kafkaCallBack.thisMessageNumber);
+            if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
+              // logger.info("Sent message. count=" +
+              // kafkaCallBack.thisMessageNumber);
               kafkaCallBack = null;
             } else {
               // Should wait for sometime
-              logger.error("Kafka is down. messageNumber="
-                + kafkaCallBack.thisMessageNumber
-                + ". Going to sleep for "
-                + FAILED_RETRY_INTERVAL + " seconds");
+              LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for "
+                  + FAILED_RETRY_INTERVAL + " seconds");
               Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
             }
 
           } catch (Throwable t) {
-            final String LOG_MESSAGE_KEY = this.getClass()
-              .getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
-            LogFeederUtil.logErrorMessageByInterval(
-              LOG_MESSAGE_KEY,
-              "Error sending message to Kafka during retry. message="
-                + (kafkaCallBack == null ? null
-                : kafkaCallBack.message), t,
-              logger, Level.ERROR);
+            String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
+            LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+                "Error sending message to Kafka during retry. message="
+                    + (kafkaCallBack == null ? null : kafkaCallBack.message),
+                t, LOG, Level.ERROR);
           }
         }
 
@@ -155,6 +151,33 @@ public class OutputKafka extends Output {
   }
 
   @Override
+  public synchronized void write(String block, InputMarker inputMarker) throws Exception {
+    while (!isDrain() && !inputMarker.input.isDrain()) {
+      try {
+        if (failedMessages.size() == 0) {
+          if (publishMessage(block, inputMarker)) {
+            break;
+          }
+        }
+        if (isDrain() || inputMarker.input.isDrain()) {
+          break;
+        }
+        if (!isKafkaBrokerUp) {
+          LOG.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds");
+          Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
+        } else {
+          LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages="
+              + failedMessages.size() + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
+          Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
+        }
+      } catch (Throwable t) {
+        // ignore
+        break;
+      }
+    }
+  }
+
+  @Override
   public void setDrain(boolean drain) {
     super.setDrain(drain);
   }
@@ -163,151 +186,101 @@ public class OutputKafka extends Output {
    * Flush document buffer
    */
   public void flush() {
-    logger.info("Flush called...");
+    LOG.info("Flush called...");
     setDrain(true);
   }
 
   @Override
   public void close() {
-    logger.info("Closing Kafka client...");
+    LOG.info("Closing Kafka client...");
     flush();
     if (producer != null) {
       try {
         producer.close();
       } catch (Throwable t) {
-        logger.error("Error closing Kafka topic. topic=" + topic);
+        LOG.error("Error closing Kafka topic. topic=" + topic);
       }
     }
-    logger.info("Closed Kafka client");
+    LOG.info("Closed Kafka client");
     super.close();
   }
 
-  @Override
-  synchronized public void write(String block, InputMarker inputMarker) throws Exception {
-    while (!isDrain() && !inputMarker.input.isDrain()) {
-      try {
-        if (failedMessages.size() == 0) {
-          if (publishMessage(block, inputMarker)) {
-            break;
-          }
-        }
-        if (isDrain() || inputMarker.input.isDrain()) {
-          break;
-        }
-        if (!isKafkaBrokerUp) {
-          logger.error("Kafka is down. Going to sleep for "
-            + FAILED_RETRY_INTERVAL + " seconds");
-          Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
-
-        } else {
-          logger.warn("Kafka is still catching up from previous failed messages. outstanding messages="
-            + failedMessages.size()
-            + " Going to sleep for "
-            + CATCHUP_RETRY_INTERVAL + " seconds");
-          Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
-        }
-      } catch (Throwable t) {
-        // ignore
-        break;
-      }
-    }
-  }
-
   private boolean publishMessage(String block, InputMarker inputMarker) {
     if (isAsync && isKafkaBrokerUp) { // Send asynchronously
       producer.send(new ProducerRecord<String, String>(topic, block),
-        new KafkaCallBack(this, block, inputMarker, ++messageCount));
+          new KafkaCallBack(this, block, inputMarker, ++messageCount));
       return true;
     } else { // Send synchronously
       try {
         // Not using key. Let it round robin
-        RecordMetadata metadata = producer.send(
-          new ProducerRecord<String, String>(topic, block)).get();
+        RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get();
         if (metadata != null) {
           statMetric.count++;
           writeBytesMetric.count += block.length();
         }
         if (!isKafkaBrokerUp) {
-          logger.info("Started writing to kafka. "
-            + getShortDescription());
+          LOG.info("Started writing to kafka. " + getShortDescription());
           isKafkaBrokerUp = true;
         }
         return true;
       } catch (InterruptedException e) {
         isKafkaBrokerUp = false;
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_KAFKA_INTERRUPT";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-          "InterruptedException-Error sending message to Kafka",
-          e, logger, Level.ERROR);
+        String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT";
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e,
+            LOG, Level.ERROR);
       } catch (ExecutionException e) {
         isKafkaBrokerUp = false;
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_KAFKA_EXECUTION";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-          "ExecutionException-Error sending message to Kafka", e,
-          logger, Level.ERROR);
+        String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION";
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e,
+            LOG, Level.ERROR);
       } catch (Throwable t) {
         isKafkaBrokerUp = false;
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_KAFKA_WRITE_ERROR";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-          "GenericException-Error sending message to Kafka", t,
-          logger, Level.ERROR);
+        String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR";
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t,
+            LOG, Level.ERROR);
       }
     }
     return false;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
-   */
   @Override
   public String getShortDescription() {
     return "output:destination=kafka,topic=" + topic;
   }
 
-}
-
-class KafkaCallBack implements Callback {
-  static private Logger logger = Logger.getLogger(KafkaCallBack.class);
+  class KafkaCallBack implements Callback {
 
-  long thisMessageNumber;
-  OutputKafka output = null;
-  String message;
-  InputMarker inputMarker;
+    private long thisMessageNumber;
+    private OutputKafka output = null;
+    private String message;
+    private InputMarker inputMarker;
 
-  public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker,
-                       long messageCount) {
-    this.thisMessageNumber = messageCount;
-    this.output = output;
-    this.inputMarker = inputMarker;
-    this.message = message;
-  }
+    public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker, long messageCount) {
+      this.thisMessageNumber = messageCount;
+      this.output = output;
+      this.inputMarker = inputMarker;
+      this.message = message;
+    }
 
-  public void onCompletion(RecordMetadata metadata, Exception exception) {
-    if (metadata != null) {
-      if (!output.isKafkaBrokerUp) {
-        logger.info("Started writing to kafka. "
-          + output.getShortDescription());
-        output.isKafkaBrokerUp = true;
+    public void onCompletion(RecordMetadata metadata, Exception exception) {
+      if (metadata != null) {
+        if (!output.isKafkaBrokerUp) {
+          LOG.info("Started writing to kafka. " + output.getShortDescription());
+          output.isKafkaBrokerUp = true;
+        }
+        output.incrementStat(1);
+        output.writeBytesMetric.count += message.length();
+
+        // metadata.partition();
+        // metadata.offset();
+      } else {
+        output.isKafkaBrokerUp = false;
+        String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
+        LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback",
+            exception, LOG, Level.ERROR);
+
+        output.failedMessages.add(this);
       }
-      output.incrementStat(1);
-      output.writeBytesMetric.count += message.length();
-
-      // metadata.partition();
-      // metadata.offset();
-    } else {
-      output.isKafkaBrokerUp = false;
-      final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-        + "_KAFKA_ASYNC_ERROR";
-      LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-        "Error sending message to Kafka. Async Callback",
-        exception, logger, Level.ERROR);
-
-      output.failedMessages.add(this);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 7cd911d..215f691 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -20,6 +20,7 @@
 package org.apache.ambari.logfeeder.output;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
@@ -37,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
@@ -45,154 +47,171 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
 
 public class OutputSolr extends Output {
-  static private Logger logger = Logger.getLogger(OutputSolr.class);
-
-  private static final String ROUTER_FIELD = "_router_field_";
-
-  String solrUrl = null;
-  String zkHosts = null;
-  String collection = null;
-  String splitMode = "none";
-  int splitInterval = 0;
-  int numberOfShards = 1;
-  boolean isComputeCurrentCollection = false;
-
-  int maxBufferSize = 5000;
-  int maxIntervalMS = 3000;
-  int workers = 1;
-
-  BlockingQueue<OutputData> outgoingBuffer = null;
-  List<SolrWorkerThread> writerThreadList = new ArrayList<SolrWorkerThread>();
-  private static final int RETRY_INTERVAL = 30;
-
-  int lastSlotByMin = -1;
+  private static final Logger LOG = Logger.getLogger(OutputSolr.class);
+
+  private static final int DEFAULT_MAX_BUFFER_SIZE = 5000;
+  private static final int DEFAULT_MAX_INTERVAL_MS = 3000;
+  private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
+  private static final int DEFAULT_SPLIT_INTERVAL = 30;
+  private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
+
+  private String collection;
+  private String splitMode;
+  private int splitInterval;
+  private int numberOfShards;
+  private int maxIntervalMS;
+  private int workers;
+  private int maxBufferSize;
+  private boolean isComputeCurrentCollection = false;
+  private int lastSlotByMin = -1;
+
+  private BlockingQueue<OutputData> outgoingBuffer = null;
+  private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
 
   @Override
   public void init() throws Exception {
     super.init();
+    initParams();
+    createOutgoingBuffer();
+    createSolrWorkers();
+  }
+
+  private void initParams() {
     statMetric.metricsName = "output.solr.write_logs";
     writeBytesMetric.metricsName = "output.solr.write_bytes";
 
-    solrUrl = getStringValue("url");
-    zkHosts = getStringValue("zk_hosts");
-    splitMode = getStringValue("splits_interval_mins", splitMode);
+    splitMode = getStringValue("splits_interval_mins", "none");
     if (!splitMode.equalsIgnoreCase("none")) {
-      splitInterval = getIntValue("split_interval_mins", 30);
+      splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
     }
-    numberOfShards = getIntValue("number_of_shards", numberOfShards);
+    numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
 
-    maxBufferSize = getIntValue("flush_size", maxBufferSize);
+    maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS);
+    workers = getIntValue("workers", DEFAULT_NUMBER_OF_WORKERS);
+
+    maxBufferSize = getIntValue("flush_size", DEFAULT_MAX_BUFFER_SIZE);
     if (maxBufferSize < 1) {
-      logger.warn("maxBufferSize is less than 1. Making it 1");
+      LOG.warn("maxBufferSize is less than 1. Making it 1");
+      maxBufferSize = 1;
     }
-    maxIntervalMS = getIntValue("idle_flush_time_ms", maxIntervalMS);
-    workers = getIntValue("workers", workers);
-
-    logger.info("Config: Number of workers=" + workers + ", splitMode="
-        + splitMode + ", splitInterval=" + splitInterval
-        + ", numberOfShards=" + numberOfShards + ". "
-        + getShortDescription());
 
-    if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) {
-      throw new Exception(
-          "For solr output, either url or zk_hosts property need to be set");
-    }
+    LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. "
+        + getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
+  }
 
+  private void createOutgoingBuffer() {
     int bufferSize = maxBufferSize * (workers + 3);
-    logger.info("Creating blocking queue with bufferSize=" + bufferSize);
-    // outgoingBuffer = new ArrayBlockingQueue<OutputData>(bufferSize);
+    LOG.info("Creating blocking queue with bufferSize=" + bufferSize);
     outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize);
+  }
+
+  private void createSolrWorkers() throws Exception, MalformedURLException {
+    String solrUrl = getStringValue("url");
+    String zkHosts = getStringValue("zk_hosts");
+    if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) {
+      throw new Exception("For solr output, either url or zk_hosts property need to be set");
+    }
 
     for (int count = 0; count < workers; count++) {
-      SolrClient solrClient = null;
-      CloudSolrClient solrClouldClient = null;
-      if (zkHosts != null) {
-        logger.info("Using zookeepr. zkHosts=" + zkHosts);
-        collection = getStringValue("collection");
-        if (StringUtils.isEmpty(collection)) {
-          throw new Exception(
-              "For solr cloud property collection is mandatory");
-        }
-        logger.info("Using collection=" + collection);
-        solrClouldClient = new CloudSolrClient(zkHosts);
-        solrClouldClient.setDefaultCollection(collection);
-        solrClient = solrClouldClient;
-        if (splitMode.equalsIgnoreCase("none")) {
-          isComputeCurrentCollection = false;
-        } else {
-          isComputeCurrentCollection = true;
-        }
-      } else {
-        String[] solrUrls = StringUtils.split(solrUrl, ",");
-        if (solrUrls.length == 1) {
-          logger.info("Using SolrURL=" + solrUrl);
-          solrClient = new HttpSolrClient(solrUrl);
-        } else {
-          logger.info("Using load balance solr client. solrUrls="
-              + solrUrl);
-          logger.info("Initial URL for LB solr=" + solrUrls[0]);
-          @SuppressWarnings("resource")
-          LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(
-              solrUrls[0]);
-          for (int i = 1; i < solrUrls.length; i++) {
-            logger.info("Adding URL for LB solr=" + solrUrls[i]);
-            lbSolrClient.addSolrServer(solrUrls[i]);
-          }
-          solrClient = lbSolrClient;
-        }
-      }
-      try {
-        logger.info("Pinging Solr server. zkHosts=" + zkHosts
-            + ", urls=" + solrUrl);
-        SolrPingResponse response = solrClient.ping();
-        if (response.getStatus() == 0) {
-          logger.info("Ping to Solr server is successful for writer="
-              + count);
-        } else {
-          logger.warn("Ping to Solr server failed. It would check again. writer="
-              + count
-              + ", solrUrl="
-              + solrUrl
-              + ", zkHosts="
-              + zkHosts
-              + ", collection="
-              + collection
-              + ", response=" + response);
-        }
-      } catch (Throwable t) {
-        logger.warn(
-            "Ping to Solr server failed. It would check again. writer="
-                + count + ", solrUrl=" + solrUrl + ", zkHosts="
-                + zkHosts + ", collection=" + collection, t);
+      SolrClient solrClient = getSolrClient(solrUrl, zkHosts, count);
+      createSolrWorkerThread(count, solrClient);
+    }
+  }
+
+  SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
+    SolrClient solrClient = null;
+
+    if (zkHosts != null) {
+      solrClient = createCloudSolrClient(zkHosts);
+    } else {
+      solrClient = createHttpSolarClient(solrUrl);
+    }
+
+    pingSolr(solrUrl, zkHosts, count, solrClient);
+
+    return solrClient;
+  }
+
+  private SolrClient createCloudSolrClient(String zkHosts) throws Exception {
+    LOG.info("Using zookeepr. zkHosts=" + zkHosts);
+    collection = getStringValue("collection");
+    if (StringUtils.isEmpty(collection)) {
+      throw new Exception("For solr cloud property collection is mandatory");
+    }
+    LOG.info("Using collection=" + collection);
+
+    CloudSolrClient solrClient = new CloudSolrClient(zkHosts);
+    solrClient.setDefaultCollection(collection);
+    isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+    return solrClient;
+  }
+
+  private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
+    String[] solrUrls = StringUtils.split(solrUrl, ",");
+    if (solrUrls.length == 1) {
+      LOG.info("Using SolrURL=" + solrUrl);
+      return new HttpSolrClient(solrUrl);
+    } else {
+      LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
+      LOG.info("Initial URL for LB solr=" + solrUrls[0]);
+      LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]);
+      for (int i = 1; i < solrUrls.length; i++) {
+        LOG.info("Adding URL for LB solr=" + solrUrls[i]);
+        lbSolrClient.addSolrServer(solrUrls[i]);
       }
+      return lbSolrClient;
+    }
+  }
 
-      // Let's start the thread
-      SolrWorkerThread solrWriterThread = new SolrWorkerThread(solrClient);
-      solrWriterThread.setName(getNameForThread() + "," + collection
-          + ",writer=" + count);
-      solrWriterThread.setDaemon(true);
-      solrWriterThread.start();
-      writerThreadList.add(solrWriterThread);
+  private void pingSolr(String solrUrl, String zkHosts, int count, SolrClient solrClient) {
+    try {
+      LOG.info("Pinging Solr server. zkHosts=" + zkHosts + ", urls=" + solrUrl);
+      SolrPingResponse response = solrClient.ping();
+      if (response.getStatus() == 0) {
+        LOG.info("Ping to Solr server is successful for worker=" + count);
+      } else {
+        LOG.warn(
+            String.format(
+                "Ping to Solr server failed. It would check again. worker=%d, "
+                    + "solrUrl=%s, zkHosts=%s, collection=%s, response=%s",
+                count, solrUrl, zkHosts, collection, response));
+      }
+    } catch (Throwable t) {
+      LOG.warn(String.format(
+          "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s",
+          count, solrUrl, zkHosts, collection), t);
     }
   }
 
+  private void createSolrWorkerThread(int count, SolrClient solrClient) {
+    SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient);
+    solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count);
+    solrWorkerThread.setDaemon(true);
+    solrWorkerThread.start();
+    workerThreadList.add(solrWorkerThread);
+  }
+
   @Override
-  public void setDrain(boolean drain) {
-    super.setDrain(drain);
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
+    try {
+      outgoingBuffer.put(new OutputData(jsonObj, inputMarker));
+    } catch (InterruptedException e) {
+      // ignore
+    }
   }
 
   /**
    * Flush document buffer
    */
   public void flush() {
-    logger.info("Flush called...");
+    LOG.info("Flush called...");
     setDrain(true);
 
     int wrapUpTimeSecs = 30;
     // Give wrapUpTimeSecs seconds to wrap up
     boolean isPending = false;
     for (int i = 0; i < wrapUpTimeSecs; i++) {
-      for (SolrWorkerThread solrWorkerThread : writerThreadList) {
+      for (SolrWorkerThread solrWorkerThread : workerThreadList) {
         if (solrWorkerThread.isDone()) {
           try {
             solrWorkerThread.interrupt();
@@ -205,8 +224,7 @@ public class OutputSolr extends Output {
       }
       if (isPending) {
         try {
-          logger.info("Will give " + (wrapUpTimeSecs - i)
-              + " seconds to wrap up");
+          LOG.info("Will give " + (wrapUpTimeSecs - i) + " seconds to wrap up");
           Thread.sleep(1000);
         } catch (InterruptedException e) {
           // ignore
@@ -217,238 +235,194 @@ public class OutputSolr extends Output {
   }
 
   @Override
+  public void setDrain(boolean drain) {
+    super.setDrain(drain);
+  }
+
+  @Override
   public long getPendingCount() {
-    long totalCount = 0;
-    for (SolrWorkerThread solrWorkerThread : writerThreadList) {
-      totalCount += solrWorkerThread.localBuffer.size();
+    long pendingCount = 0;
+    for (SolrWorkerThread solrWorkerThread : workerThreadList) {
+      pendingCount += solrWorkerThread.localBuffer.size();
     }
-    return totalCount;
+    return pendingCount;
   }
 
   @Override
   public void close() {
-    logger.info("Closing Solr client...");
+    LOG.info("Closing Solr client...");
     flush();
 
-    logger.info("Closed Solr client");
+    LOG.info("Closed Solr client");
     super.close();
   }
 
   @Override
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
-      throws Exception {
-    try {
-      outgoingBuffer.put(new OutputData(jsonObj, inputMarker));
-    } catch (InterruptedException e) {
-      // ignore
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
-   */
-  @Override
   public String getShortDescription() {
     return "output:destination=solr,collection=" + collection;
   }
 
   class SolrWorkerThread extends Thread {
-    /**
-     * 
-     */
-    SolrClient solrClient = null;
-    Collection<SolrInputDocument> localBuffer = new ArrayList<SolrInputDocument>();
-    long localBufferBytesSize = 0;
-    Map<String, InputMarker> latestInputMarkerList = new HashMap<String, InputMarker>();
+    private static final String ROUTER_FIELD = "_router_field_";
+    private static final int RETRY_INTERVAL = 30;
+
+    private final SolrClient solrClient;
+    private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
+    private final Map<String, InputMarker> latestInputMarkerList = new HashMap<>();
+
+    private long localBufferBytesSize = 0;
 
-    /**
-     * 
-     */
     public SolrWorkerThread(SolrClient solrClient) {
       this.solrClient = solrClient;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Runnable#run()
-     */
     @Override
     public void run() {
-      logger.info("SolrWriter thread started");
+      LOG.info("SolrWorker thread started");
       long lastDispatchTime = System.currentTimeMillis();
 
-      //long totalWaitTimeMS = 0;
       while (true) {
         long currTimeMS = System.currentTimeMillis();
         OutputData outputData = null;
         try {
-          long nextDispatchDuration = maxIntervalMS
-              - (currTimeMS - lastDispatchTime);
-          outputData = outgoingBuffer.poll();
-          if (outputData == null && !isDrain()
-              && nextDispatchDuration > 0) {
-            outputData = outgoingBuffer.poll(nextDispatchDuration,
-                TimeUnit.MILLISECONDS);
-//            long diffTimeMS = System.currentTimeMillis()
-//                - currTimeMS;
-            // logger.info("Waited for " + diffTimeMS +
-            // " ms, planned for "
-            // + nextDispatchDuration + " ms, localBuffer.size="
-            // + localBuffer.size() + ", timedOut="
-            // + (outputData == null ? "true" : "false"));
-          }
+          long nextDispatchDuration = maxIntervalMS - (currTimeMS - lastDispatchTime);
+          outputData = getOutputData(nextDispatchDuration);
 
-          if (isDrain() && outputData == null
-              && outgoingBuffer.size() == 0) {
-            break;
-          }
           if (outputData != null) {
-            if (outputData.jsonObj.get("id") == null) {
-              outputData.jsonObj.put("id", UUID.randomUUID()
-                  .toString());
-            }
-            SolrInputDocument document = new SolrInputDocument();
-            for (String name : outputData.jsonObj.keySet()) {
-              Object obj = outputData.jsonObj.get(name);
-              document.addField(name, obj);
-              try {
-                localBufferBytesSize += obj.toString().length();
-              } catch (Throwable t) {
-                final String LOG_MESSAGE_KEY = this.getClass()
-                    .getSimpleName() + "_BYTE_COUNT_ERROR";
-                LogFeederUtil.logErrorMessageByInterval(
-                    LOG_MESSAGE_KEY,
-                    "Error calculating byte size. object="
-                        + obj, t, logger, Level.ERROR);
-
-              }
+            createSolrDocument(outputData);
+          } else {
+            if (isDrain() && outgoingBuffer.size() == 0) {
+              break;
             }
-            latestInputMarkerList.put(
-                outputData.inputMarker.base64FileKey,
-                outputData.inputMarker);
-            localBuffer.add(document);
           }
 
-          if (localBuffer.size() > 0
-              && ((outputData == null && isDrain()) || (nextDispatchDuration <= 0 || localBuffer
-                  .size() >= maxBufferSize))) {
+          if (localBuffer.size() > 0 && ((outputData == null && isDrain())
+              || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
             try {
               if (isComputeCurrentCollection) {
                 // Compute the current router value
-
-                int weekDay = Calendar.getInstance().get(
-                    Calendar.DAY_OF_WEEK);
-                int currHour = Calendar.getInstance().get(
-                    Calendar.HOUR_OF_DAY);
-                int currMin = Calendar.getInstance().get(
-                    Calendar.MINUTE);
-
-                int minOfWeek = (weekDay - 1) * 24 * 60
-                    + currHour * 60 + currMin;
-                int slotByMin = minOfWeek / splitInterval
-                    % numberOfShards;
-
-                String shard = "shard" + slotByMin;
-
-                if (lastSlotByMin != slotByMin) {
-                  logger.info("Switching to shard " + shard
-                      + ", output="
-                      + getShortDescription());
-                  lastSlotByMin = slotByMin;
-                }
-
-                for (SolrInputDocument solrInputDocument : localBuffer) {
-                  solrInputDocument.addField(ROUTER_FIELD,
-                      shard);
-                }
+                addRouterField();
               }
 
-//              long beginTime = System.currentTimeMillis();
-              UpdateResponse response = solrClient
-                  .add(localBuffer);
-//              long endTime = System.currentTimeMillis();
-//              logger.info("Adding to Solr. Document count="
-//                  + localBuffer.size() + ". Took "
-//                  + (endTime - beginTime) + " ms");
-
-              if (response.getStatus() != 0) {
-                final String LOG_MESSAGE_KEY = this.getClass()
-                    .getSimpleName() + "_SOLR_UPDATE_ERROR";
-                LogFeederUtil
-                    .logErrorMessageByInterval(
-                        LOG_MESSAGE_KEY,
-                        "Error writing to Solr. response="
-                            + response.toString()
-                            + ", log="
-                            + (outputData == null ? null
-                                : outputData
-                                    .toString()),
-                        null, logger, Level.ERROR);
-              }
-              statMetric.count += localBuffer.size();
-              writeBytesMetric.count += localBufferBytesSize;
-              for (InputMarker inputMarker : latestInputMarkerList
-                  .values()) {
-                inputMarker.input.checkIn(inputMarker);
-              }
+              addToSolr(outputData);
 
               resetLocalBuffer();
               lastDispatchTime = System.currentTimeMillis();
             } catch (IOException ioException) {
               // Transient error, lets block till it is available
-              while (!isDrain()) {
-                try {
-                  logger.warn("Solr is down. Going to sleep for "
-                      + RETRY_INTERVAL
-                      + " seconds. output="
-                      + getShortDescription());
-                  Thread.sleep(RETRY_INTERVAL * 1000);
-                } catch (Throwable t) {
-                  // ignore
-                  break;
-                }
-                if (isDrain()) {
-                  break;
-                }
-                try {
-                  SolrPingResponse pingResponse = solrClient
-                      .ping();
-                  if (pingResponse.getStatus() == 0) {
-                    logger.info("Solr seems to be up now. Resuming... output="
-                        + getShortDescription());
-                    break;
-                  }
-                } catch (Throwable t) {
-                  // Ignore
-                }
-              }
+              waitForSolr();
             } catch (Throwable serverException) {
               // Clear the buffer
               resetLocalBuffer();
-              final String LOG_MESSAGE_KEY = this.getClass()
-                  .getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
-              LogFeederUtil.logErrorMessageByInterval(
-                  LOG_MESSAGE_KEY,
-                  "Error sending log message to server. "
-                      + (outputData == null ? null
-                          : outputData.toString()),
-                  serverException, logger, Level.ERROR);
+              String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
+              LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+                  "Error sending log message to server. " + outputData, serverException, LOG, Level.ERROR);
             }
           }
         } catch (InterruptedException e) {
           // Handle thread exiting
         } catch (Throwable t) {
-          final String LOG_MESSAGE_KEY = this.getClass()
-              .getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
-          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-              "Caught exception in main loop. " + outputData, t,
-              logger, Level.ERROR);
+          String logMessageKey = this.getClass().getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in main loop. " + outputData, t, LOG,
+              Level.ERROR);
+        }
+      }
+
+      closeSolrClient();
+
+      resetLocalBuffer();
+      LOG.info("Exiting Solr worker thread. output=" + getShortDescription());
+    }
+
+    private OutputData getOutputData(long nextDispatchDuration) throws InterruptedException {
+      OutputData outputData = outgoingBuffer.poll();
+      if (outputData == null && !isDrain() && nextDispatchDuration > 0) {
+        outputData = outgoingBuffer.poll(nextDispatchDuration, TimeUnit.MILLISECONDS);
+      }
+      if (outputData != null && outputData.jsonObj.get("id") == null) {
+        outputData.jsonObj.put("id", UUID.randomUUID().toString());
+      }
+      return outputData;
+    }
+
+    private void createSolrDocument(OutputData outputData) {
+      SolrInputDocument document = new SolrInputDocument();
+      for (String name : outputData.jsonObj.keySet()) {
+        Object obj = outputData.jsonObj.get(name);
+        document.addField(name, obj);
+        try {
+          localBufferBytesSize += obj.toString().length();
+        } catch (Throwable t) {
+          String logMessageKey = this.getClass().getSimpleName() + "_BYTE_COUNT_ERROR";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error calculating byte size. object=" + obj, t, LOG,
+              Level.ERROR);
+        }
+      }
+      latestInputMarkerList.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+      localBuffer.add(document);
+    }
+
+    private void addRouterField() {
+      Calendar cal = Calendar.getInstance();
+      int weekDay = cal.get(Calendar.DAY_OF_WEEK);
+      int currHour = cal.get(Calendar.HOUR_OF_DAY);
+      int currMin = cal.get(Calendar.MINUTE);
+
+      int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin;
+      int slotByMin = minOfWeek / splitInterval % numberOfShards;
+
+      String shard = "shard" + slotByMin;
+
+      if (lastSlotByMin != slotByMin) {
+        LOG.info("Switching to shard " + shard + ", output=" + getShortDescription());
+        lastSlotByMin = slotByMin;
+      }
+
+      for (SolrInputDocument solrInputDocument : localBuffer) {
+        solrInputDocument.addField(ROUTER_FIELD, shard);
+      }
+    }
+
+    private void addToSolr(OutputData outputData) throws SolrServerException, IOException {
+      UpdateResponse response = solrClient.add(localBuffer);
+      if (response.getStatus() != 0) {
+        String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_ERROR";
+        LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+            String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, LOG, Level.ERROR);
+      }
+      statMetric.count += localBuffer.size();
+      writeBytesMetric.count += localBufferBytesSize;
+      for (InputMarker inputMarker : latestInputMarkerList.values()) {
+        inputMarker.input.checkIn(inputMarker);
+      }
+    }
+
+    private void waitForSolr() {
+      while (!isDrain()) {
+        try {
+          LOG.warn(
+              "Solr is down. Going to sleep for " + RETRY_INTERVAL + " seconds. " + "output=" + getShortDescription());
+          Thread.sleep(RETRY_INTERVAL * 1000);
+        } catch (Throwable t) {
+          // ignore
+          break;
+        }
+        if (isDrain()) {
+          break;
+        }
+        try {
+          SolrPingResponse pingResponse = solrClient.ping();
+          if (pingResponse.getStatus() == 0) {
+            LOG.info("Solr seems to be up now. Resuming... output=" + getShortDescription());
+            break;
+          }
+        } catch (Throwable t) {
+          // Ignore
         }
       }
+    }
 
+    private void closeSolrClient() {
       if (solrClient != null) {
         try {
           solrClient.close();
@@ -456,14 +430,6 @@ public class OutputSolr extends Output {
           // Ignore
         }
       }
-
-      resetLocalBuffer();
-      logger.info("Exiting Solr writer thread. output="
-          + getShortDescription());
-    }
-
-    public boolean isDone() {
-      return localBuffer.size() == 0;
     }
 
     public void resetLocalBuffer() {
@@ -471,5 +437,9 @@ public class OutputSolr extends Output {
       localBufferBytesSize = 0;
       latestInputMarkerList.clear();
     }
+
+    public boolean isDone() {
+      return localBuffer.isEmpty();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
new file mode 100644
index 0000000..9f943ec
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FilterGrokTest {
+  private static final Logger LOG = Logger.getLogger(FilterGrokTest.class);
+
+  private FilterGrok filterGrok;
+  private OutputMgr mockOutputMgr;
+  private Capture<Map<String, Object>> capture;
+
+  public void init(Map<String, Object> config) throws Exception {
+    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    capture = EasyMock.newCapture(CaptureType.LAST);
+
+    filterGrok = new FilterGrok();
+    filterGrok.loadConfig(config);
+    filterGrok.setOutputMgr(mockOutputMgr);
+    filterGrok.setInput(EasyMock.mock(Input.class));
+    filterGrok.init();
+  }
+
+  @Test
+  public void testFilterGrok_parseMessage() throws Exception {
+    LOG.info("testFilterGrok_parseMessage()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("message_pattern",
+        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+    init(config);
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertNotNull(jsonParams);
+    assertEquals("Incorrect parsing: log time", "2016-04-08 15:55:23,548", jsonParams.remove("logtime"));
+    assertEquals("Incorrect parsing: log level", "INFO", jsonParams.remove("level"));
+    assertEquals("Incorrect parsing: log message", "This is a test", jsonParams.remove("log_message"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @Test
+  public void testFilterGrok_parseMultiLineMessage() throws Exception {
+    LOG.info("testFilterGrok_parseMultiLineMessage()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("message_pattern",
+        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+    init(config);
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    String multiLineMessage = "This is a multiline test message\r\n" + "having multiple lines\r\n"
+        + "as one may expect";
+    String[] messageLines = multiLineMessage.split("\r\n");
+    for (int i = 0; i < messageLines.length; i++)
+      filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker());
+    filterGrok.flush();
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertNotNull(jsonParams);
+    assertEquals("Incorrect parsing: log time", "2016-04-08 15:55:23,548", jsonParams.remove("logtime"));
+    assertEquals("Incorrect parsing: log level", "INFO", jsonParams.remove("level"));
+    assertEquals("Incorrect parsing: log message", multiLineMessage, jsonParams.remove("log_message"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @Test
+  public void testFilterGrok_notMatchingMesagePattern() throws Exception {
+    LOG.info("testFilterGrok_notMatchingMesagePattern()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("message_pattern",
+        "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+    init(config);
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(mockOutputMgr);
+
+    filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker());
+    filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    assertFalse("Something was captured!", capture.hasCaptured());
+  }
+
+  @Test
+  public void testFilterGrok_noMesagePattern() throws Exception {
+    LOG.info("testFilterGrok_noMesagePattern()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+    init(config);
+
+    EasyMock.replay(mockOutputMgr);
+
+    filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
+    filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    assertFalse("Something was captured", capture.hasCaptured());
+  }
+
+  @After
+  public void cleanUp() {
+    capture.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
new file mode 100644
index 0000000..58db8f2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FilterKeyValueTest {
+  private static final Logger LOG = Logger.getLogger(FilterKeyValueTest.class);
+
+  private FilterKeyValue filterKeyValue;
+  private OutputMgr mockOutputMgr;
+  private Capture<Map<String, Object>> capture;
+
+  public void init(Map<String, Object> config) throws Exception {
+    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    capture = EasyMock.newCapture(CaptureType.LAST);
+
+    filterKeyValue = new FilterKeyValue();
+    filterKeyValue.loadConfig(config);
+    filterKeyValue.setOutputMgr(mockOutputMgr);
+    filterKeyValue.init();
+  }
+
+  @Test
+  public void testFilterKeyValue_extraction() throws Exception {
+    LOG.info("testFilterKeyValue_extraction()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("source_field", "keyValueField");
+    config.put("field_split", "&");
+    // using default value split:
+    init(config);
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("keyValueField"));
+    assertEquals("Incorrect extraction: name1", "value1", jsonParams.remove("name1"));
+    assertEquals("Incorrect extraction: name2", "value2", jsonParams.remove("name2"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @Test
+  public void testFilterKeyValue_missingSourceField() throws Exception {
+    LOG.info("testFilterKeyValue_missingSourceField()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("field_split", "&");
+    // using default value split: =
+    init(config);
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(mockOutputMgr);
+
+    filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    assertFalse("Something was captured!", capture.hasCaptured());
+  }
+
+  @Test
+  public void testFilterKeyValue_noSourceFieldPresent() throws Exception {
+    LOG.info("testFilterKeyValue_noSourceFieldPresent()");
+
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("source_field", "keyValueField");
+    config.put("field_split", "&");
+    init(config);
+
+    // using default value split: =
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(mockOutputMgr);
+
+    filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("otherField"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @After
+  public void cleanUp() {
+    capture.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
new file mode 100644
index 0000000..ebfd0f5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JSONFilterCodeTest {
+  private static final Logger LOG = Logger.getLogger(JSONFilterCodeTest.class);
+
+  private JSONFilterCode jsonFilterCode;
+  private OutputMgr mockOutputMgr;
+  private Capture<Map<String, Object>> capture;
+
+  public void init(Map<String, Object> params) throws Exception {
+    mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+    capture = EasyMock.newCapture(CaptureType.LAST);
+
+    jsonFilterCode = new JSONFilterCode();
+    jsonFilterCode.loadConfig(params);
+    jsonFilterCode.setOutputMgr(mockOutputMgr);
+    jsonFilterCode.init();
+  }
+
+  @Test
+  public void testJSONFilterCode_convertFields() throws Exception {
+    LOG.info("testJSONFilterCode_convertFields()");
+
+    init(new HashMap<String, Object>());
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    Date d = new Date();
+    String dateString = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).format(d);
+    jsonFilterCode.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+    assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @Test
+  public void testJSONFilterCode_logTimeOnly() throws Exception {
+    LOG.info("testJSONFilterCode_logTimeOnly()");
+
+    init(new HashMap<String, Object>());
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    Date d = new Date();
+    String dateString = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).format(d);
+    jsonFilterCode.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+    assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @Test
+  public void testJSONFilterCode_lineNumberOnly() throws Exception {
+    LOG.info("testJSONFilterCode_lineNumberOnly()");
+
+    init(new HashMap<String, Object>());
+
+    mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+    EasyMock.expectLastCall();
+    EasyMock.replay(mockOutputMgr);
+
+    jsonFilterCode.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker());
+
+    EasyMock.verify(mockOutputMgr);
+    Map<String, Object> jsonParams = capture.getValue();
+
+    assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
+    assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
+    assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+  }
+
+  @After
+  public void cleanUp() {
+    capture.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
new file mode 100644
index 0000000..2242a83
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.InputMgr;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class InputFileTest {
+  private static final Logger LOG = Logger.getLogger(InputFileTest.class);
+
+  private static final String TEST_DIR_NAME = "/logfeeder_test_dir/";
+  private static final File TEST_DIR = new File(FileUtils.getTempDirectoryPath() + TEST_DIR_NAME);
+
+  private static final String TEST_LOG_FILE_CONTENT = "2016-03-10 14:09:38,278 INFO  datanode.DataNode (DataNode.java:<init>(418)) - File descriptor passing is enabled.\n"
+      + "2016-03-10 14:09:38,278 INFO  datanode.DataNode (DataNode.java:<init>(429)) - Configured hostname is c6401.ambari.apache.org\n"
+      + "2016-03-10 14:09:38,294 INFO  datanode.DataNode (DataNode.java:startDataNode(1127)) - Starting DataNode with maxLockedMemory = 0\n"
+      + "2016-03-10 14:09:38,340 INFO  datanode.DataNode (DataNode.java:initDataXceiver(921)) - Opened streaming server at /0.0.0.0:50010\n"
+      + "2016-03-10 14:09:38,343 INFO  datanode.DataNode (DataXceiverServer.java:<init>(76)) - Balancing bandwith is 6250000 bytes/s\n"
+      + "2016-03-10 14:09:38,343 INFO  datanode.DataNode (DataXceiverServer.java:<init>(77)) - Number threads for balancing is 5\n"
+      + "2016-03-10 14:09:38,345 INFO  datanode.DataNode (DataXceiverServer.java:<init>(76)) - Balancing bandwith is 6250000 bytes/s\n"
+      + "2016-03-10 14:09:38,346 INFO  datanode.DataNode (DataXceiverServer.java:<init>(77)) - Number threads for balancing is 5\n";
+
+  private static final String[] TEST_LOG_FILE_ROWS = TEST_LOG_FILE_CONTENT.split("\n");
+  private InputFile inputFile;
+  private List<String> rows = new ArrayList<>();
+
+  private InputMarker testInputMarker;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @BeforeClass
+  public static void initDir() throws IOException {
+    if (!TEST_DIR.exists()) {
+      TEST_DIR.mkdir();
+    }
+    FileUtils.cleanDirectory(TEST_DIR);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  public void init(String path) throws Exception {
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("source", "file");
+    config.put("tail", "true");
+    config.put("gen_event_md5", "true");
+    config.put("start_position", "beginning");
+
+    config.put("type", "hdfs_datanode");
+    config.put("rowtype", "service");
+    config.put("path", path);
+
+    Filter capture = new Filter() {
+      @Override
+      public void init() {
+      }
+
+      @Override
+      public void apply(String inputStr, InputMarker inputMarker) {
+        rows.add(inputStr);
+        if (rows.size() % 3 == 0)
+          inputFile.setDrain(true);
+
+        testInputMarker = inputMarker;
+      }
+    };
+
+    inputFile = new InputFile();
+    inputFile.loadConfig(config);
+    inputFile.setFirstFilter(capture);
+    inputFile.init();
+  }
+
+  @Test
+  public void testInputFile_process3Rows() throws Exception {
+    LOG.info("testInputFile_process3Rows()");
+
+    File checkPointDir = createCheckpointDir("process3_checkpoint");
+    File testFile = createFile("process3.log");
+
+    init(testFile.getAbsolutePath());
+
+    InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
+    EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir);
+    EasyMock.replay(inputMgr);
+    inputFile.setInputMgr(inputMgr);
+
+    inputFile.isReady();
+    inputFile.start();
+
+    assertEquals("Amount of the rows is incorrect", rows.size(), 3);
+    for (int row = 0; row < 3; row++)
+      assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
+
+    EasyMock.verify(inputMgr);
+  }
+
+  @Test
+  public void testInputFile_process6RowsInterrupted() throws Exception {
+    LOG.info("testInputFile_process6RowsInterrupted()");
+
+    File checkPointDir = createCheckpointDir("process6_checkpoint");
+    File testFile = createFile("process6.log");
+    init(testFile.getAbsolutePath());
+
+    InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
+    EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
+    EasyMock.replay(inputMgr);
+    inputFile.setInputMgr(inputMgr);
+
+    inputFile.isReady();
+    inputFile.start();
+    inputFile.checkIn(testInputMarker);
+    inputFile.setDrain(false);
+    inputFile.start();
+
+    assertEquals("Amount of the rows is incorrect", rows.size(), 6);
+    for (int row = 0; row < 6; row++)
+      assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
+
+    EasyMock.verify(inputMgr);
+  }
+
+  @Test
+  public void testInputFile_noLogPath() throws Exception {
+    LOG.info("testInputFile_noLogPath()");
+
+    expectedException.expect(NullPointerException.class);
+
+    init(null);
+    inputFile.isReady();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    rows.clear();
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    FileUtils.deleteDirectory(TEST_DIR);
+  }
+
+  private File createFile(String filename) throws IOException {
+    File newFile = new File(FileUtils.getTempDirectoryPath() + TEST_DIR_NAME + filename);
+    FileUtils.writeStringToFile(newFile, TEST_LOG_FILE_CONTENT);
+    return newFile;
+  }
+
+  private File createCheckpointDir(String dirname) {
+    File newDir = new File(TEST_DIR + "/" + dirname);
+    if (!newDir.exists()) {
+      newDir.mkdir();
+    }
+    return newDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
new file mode 100644
index 0000000..2df03bd
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperDateTest {
+  private static final Logger LOG = Logger.getLogger(MapperDateTest.class);
+
+  @Test
+  public void testMapperDate_epoch() {
+    LOG.info("testMapperDate_epoch()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("date_pattern", "epoch");
+
+    MapperDate mapperDate = new MapperDate();
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+
+    Date d = DateUtils.truncate(new Date(), Calendar.SECOND);
+    Object mappedValue = mapperDate.apply(jsonObj, Long.toString(d.getTime() / 1000));
+
+    assertEquals("Value wasn't matched properly", d, mappedValue);
+    assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperDate_pattern() throws Exception {
+    LOG.info("testMapperDate_pattern()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("date_pattern", LogFeederUtil.DATE_FORMAT);
+
+    MapperDate mapperDate = new MapperDate();
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    String dateString = "2016-04-08 15:55:23.548";
+    Object mappedValue = mapperDate.apply(jsonObj, dateString);
+
+    Date d = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).parse(dateString);
+
+    assertEquals("Value wasn't matched properly", d, mappedValue);
+    assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperDate_configNotMap() {
+    LOG.info("testMapperDate_configNotMap()");
+
+    MapperDate mapperDate = new MapperDate();
+    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, ""));
+  }
+
+  @Test
+  public void testMapperDate_noDatePattern() {
+    LOG.info("testMapperDate_noDatePattern()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("some_param", "some_value");
+
+    MapperDate mapperDate = new MapperDate();
+    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+  }
+
+  @Test
+  public void testMapperDate_notParsableDatePattern() {
+    LOG.info("testMapperDate_notParsableDatePattern()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("date_pattern", "not_parsable_content");
+
+    MapperDate mapperDate = new MapperDate();
+    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+  }
+
+  @Test
+  public void testMapperDate_invalidEpochValue() {
+    LOG.info("testMapperDate_invalidEpochValue()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("date_pattern", "epoch");
+
+    MapperDate mapperDate = new MapperDate();
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    String invalidValue = "abc";
+    Object mappedValue = mapperDate.apply(jsonObj, invalidValue);
+
+    assertEquals("Invalid value wasn't returned as it is", invalidValue, mappedValue);
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperDate_invalidDateStringValue() {
+    LOG.info("testMapperDate_invalidDateStringValue()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("date_pattern", LogFeederUtil.DATE_FORMAT);
+
+    MapperDate mapperDate = new MapperDate();
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    String invalidValue = "abc";
+    Object mappedValue = mapperDate.apply(jsonObj, invalidValue);
+
+    assertEquals("Invalid value wasn't returned as it is", invalidValue, mappedValue);
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
new file mode 100644
index 0000000..6edf766
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperFieldNameTest {
+  private static final Logger LOG = Logger.getLogger(MapperFieldNameTest.class);
+
+  @Test
+  public void testMapperFieldName_replaceField() {
+    LOG.info("testMapperFieldName_replaceField()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("new_fieldname", "someOtherField");
+
+    MapperFieldName mapperFieldName = new MapperFieldName();
+    assertTrue("Could not initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    jsonObj.put("someField", "someValue");
+
+    mapperFieldName.apply(jsonObj, "someOtherValue");
+
+    assertFalse("Old field name wasn't removed", jsonObj.containsKey("someField"));
+    assertEquals("New field wasn't put", "someOtherValue", jsonObj.remove("someOtherField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperFieldName_configNotMap() {
+    LOG.info("testMapperFieldName_configNotMap()");
+
+    MapperFieldName mapperFieldName = new MapperFieldName();
+    assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, ""));
+  }
+
+  @Test
+  public void testMapperFieldName_noNewFieldName() {
+    LOG.info("testMapperFieldName_noNewFieldName()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+
+    MapperFieldName mapperFieldName = new MapperFieldName();
+    assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
new file mode 100644
index 0000000..df84b8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperFieldValueTest {
+  private static final Logger LOG = Logger.getLogger(MapperFieldValueTest.class);
+
+  @Test
+  public void testMapperFieldValue_replaceValue() {
+    LOG.info("testMapperFieldValue_replaceValue()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("pre_value", "someValue");
+    mapConfigs.put("post_value", "someOtherValue");
+
+    MapperFieldValue mapperFieldValue = new MapperFieldValue();
+    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+
+    Object mappedValue = mapperFieldValue.apply(jsonObj, "someValue");
+
+    assertEquals("Value wasn't mapped", "someOtherValue", mappedValue);
+    assertEquals("New field wasn't put into jsonObj", "someOtherValue", jsonObj.remove("someField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperFieldValue_configNotMap() {
+    LOG.info("testMapperFieldValue_configNotMap()");
+
+    MapperFieldValue mapperFieldValue = new MapperFieldValue();
+    assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, ""));
+  }
+
+  @Test
+  public void testMapperFieldValue_noPostValue() {
+    LOG.info("testMapperFieldValue_noPostValue()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+
+    MapperFieldValue mapperFieldValue = new MapperFieldValue();
+    assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+  }
+
+  @Test
+  public void testMapperFieldValue_noPreValueFound() {
+    LOG.info("testMapperFieldValue_noPreValueFound()");
+
+    Map<String, Object> mapConfigs = new HashMap<>();
+    mapConfigs.put("pre_value", "someValue");
+    mapConfigs.put("post_value", "someOtherValue");
+
+    MapperFieldValue mapperFieldValue = new MapperFieldValue();
+    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+
+    Object mappedValue = mapperFieldValue.apply(jsonObj, "yetAnotherValue");
+
+    assertEquals("Value was mapped", "yetAnotherValue", mappedValue);
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+}