You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 23:38:35 UTC

[45/50] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
new file mode 100644
index 0000000..368a930
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.junit.Test;
+
+public class InputManagerTest {
+
+  @Test
+  public void testInputManager_addAndRemoveInputs() {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    Input input4 = strictMock(Input.class);
+    
+    expect(input3.getShortDescription()).andReturn("").times(2);
+    expect(input4.getShortDescription()).andReturn("").once();
+    
+    replay(input1, input2, input3, input4);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.removeInput(input3);
+    manager.removeInput(input4);
+    
+    verify(input1, input2, input3, input4);
+    
+    List<Input> inputList = manager.getInputList();
+    assertEquals(inputList.size(), 2);
+    assertEquals(inputList.get(0), input1);
+    assertEquals(inputList.get(1), input2);
+  }
+
+  @Test
+  public void testInputManager_init() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    input1.init(); expectLastCall();
+    input2.init(); expectLastCall();
+    input3.init(); expectLastCall();
+    
+    expect(input1.isTail()).andReturn(false);
+    expect(input2.isTail()).andReturn(false);
+    expect(input3.isTail()).andReturn(false);
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.init();
+    
+    verify(input1, input2, input3);
+  }
+
+  @Test
+  public void testInputManager_monitor() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    expect(input1.isReady()).andReturn(true);
+    expect(input2.isReady()).andReturn(true);
+    expect(input3.isReady()).andReturn(false);
+    
+    expect(input1.monitor()).andReturn(false);
+    expect(input2.monitor()).andReturn(false);
+    expect(input3.isTail()).andReturn(false);
+    expect(input3.getShortDescription()).andReturn("").once();
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.monitor();
+    
+    verify(input1, input2, input3);
+  }
+  
+
+  @Test
+  public void testInputManager_addMetricsContainers() throws Exception {
+    List<MetricData> metrics = new ArrayList<MetricData>();
+    
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    input1.addMetricsContainers(metrics); expectLastCall();
+    input2.addMetricsContainers(metrics); expectLastCall();
+    input3.addMetricsContainers(metrics); expectLastCall();
+    
+    expect(input1.isReady()).andReturn(true);
+    expect(input2.isReady()).andReturn(true);
+    expect(input3.isReady()).andReturn(false);
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.addMetricsContainers(metrics);
+    
+    verify(input1, input2, input3);
+  }
+
+  @Test
+  public void testInputManager_logStat() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    input1.logStat(); expectLastCall();
+    input2.logStat(); expectLastCall();
+    input3.logStat(); expectLastCall();
+    
+    expect(input1.isReady()).andReturn(true);
+    expect(input2.isReady()).andReturn(true);
+    expect(input3.isReady()).andReturn(false);
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.logStats();
+    
+    verify(input1, input2, input3);
+  }
+
+  @Test
+  public void testInputManagr_waitOnAllInputs() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    Thread mockThread = strictMock(Thread.class);
+    
+    expect(input1.getThread()).andReturn(null);
+    expect(input2.getThread()).andReturn(null);
+    expect(input3.getThread()).andReturn(mockThread);
+    
+    mockThread.join(); expectLastCall();
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.waitOnAllInputs();
+    
+    verify(input1, input2, input3);
+  }
+
+  @Test
+  public void testInputManager_checkInAll() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    input1.lastCheckIn(); expectLastCall();
+    input2.lastCheckIn(); expectLastCall();
+    input3.lastCheckIn(); expectLastCall();
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.checkInAll();
+    
+    verify(input1, input2, input3);
+  }
+
+  @Test
+  public void testInputManager_close() throws Exception {
+    Input input1 = strictMock(Input.class);
+    Input input2 = strictMock(Input.class);
+    Input input3 = strictMock(Input.class);
+    
+    input1.setDrain(true); expectLastCall();
+    input2.setDrain(true); expectLastCall();
+    input3.setDrain(true); expectLastCall();
+    
+    expect(input1.isClosed()).andReturn(true);
+    expect(input2.isClosed()).andReturn(true);
+    expect(input3.isClosed()).andReturn(true);
+    
+    replay(input1, input2, input3);
+    
+    InputManager manager = new InputManager();
+    manager.add(input1);
+    manager.add(input2);
+    manager.add(input3);
+    
+    manager.close();
+    
+    verify(input1, input2, input3);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
new file mode 100644
index 0000000..02ffd47
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LogConfigHandlerTest {
+  
+  private static LogConfigFetcher mockFetcher;
+  
+  private static final Map<String, Object> CONFIG_MAP = new HashMap<>();
+  static {
+    CONFIG_MAP.put("jsons",
+        "{'filter':{" +
+          "'configured_log_file':{" +
+            "'label':'configured_log_file'," +
+            "'hosts':[]," +
+            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
+            "'overrideLevels':[]}," +
+          "'configured_log_file2':{" +
+            "'label':'configured_log_file2'," +
+            "'hosts':['host1']," +
+            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
+            "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
+            "'expiryTime':'3000-01-01T00:00:00.000Z'}," +
+          "'configured_log_file3':{" +
+            "'label':'configured_log_file3'," +
+            "'hosts':['host1']," +
+            "'defaultLevels':['FATAL','ERROR','WARN','INFO']," +
+            "'overrideLevels':['FATAL','ERROR','WARN','INFO','DEBUG','TRACE']," +
+            "'expiryTime':'1000-01-01T00:00:00.000Z'}" +
+          "}}");
+  }
+  
+  @BeforeClass
+  public static void init() throws Exception {
+    mockFetcher = strictMock(LogConfigFetcher.class);
+    Field f = LogConfigFetcher.class.getDeclaredField("instance");
+    f.setAccessible(true);
+    f.set(null, mockFetcher);
+    expect(mockFetcher.getConfigDoc()).andReturn(CONFIG_MAP).anyTimes();
+    replay(mockFetcher);
+    
+    LogFeederUtil.loadProperties("logfeeder.properties", null);
+    LogConfigHandler.handleConfig();
+    Thread.sleep(1000);
+  }
+  
+  @Test
+  public void testLogConfigHandler_emptyDataAllowed() throws Exception {
+    assertTrue(FilterLogData.INSTANCE.isAllowed((String)null));
+    assertTrue(FilterLogData.INSTANCE.isAllowed(""));
+    assertTrue(FilterLogData.INSTANCE.isAllowed(Collections.<String, Object> emptyMap()));
+  }
+  
+  @Test
+  public void testLogConfigHandler_notConfiguredLogAllowed() throws Exception {
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'not_configured_log_file', 'level':'INFO'}"));
+  }
+  
+  @Test
+  public void testLogConfigHandler_configuredDataAllow() throws Exception {
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'INFO'}"));
+  }
+  
+  @Test
+  public void testLogConfigHandler_configuredDataDontAllow() throws Exception {
+    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file', 'level':'DEBUG'}"));
+  }
+  
+  @Test
+  public void testLogConfigHandler_overridenConfiguredData() throws Exception {
+    assertTrue(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file2', 'level':'DEBUG'}"));
+  }
+  
+  @Test
+  public void testLogConfigHandler_overridenConfiguredDataDifferentHost() throws Exception {
+    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host2', 'type':'configured_log_file2', 'level':'DEBUG'}"));
+  }
+  
+  @Test
+  public void testLogConfigHandler_overridenConfiguredDataExpired() throws Exception {
+    assertFalse(FilterLogData.INSTANCE.isAllowed("{'host':'host1', 'type':'configured_log_file3', 'level':'DEBUG'}"));
+  }
+  
+  @AfterClass
+  public static void finish() {
+    verify(mockFetcher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 301dea9..667c9ff 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.log4j.Logger;
 import org.junit.Test;
@@ -61,7 +60,7 @@ public class MapperDateTest {
     LOG.info("testMapperDate_pattern()");
 
     Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", LogFeederUtil.DATE_FORMAT);
+    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
 
     MapperDate mapperDate = new MapperDate();
     assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
@@ -70,7 +69,7 @@ public class MapperDateTest {
     String dateString = "2016-04-08 15:55:23.548";
     Object mappedValue = mapperDate.apply(jsonObj, dateString);
 
-    Date d = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).parse(dateString);
+    Date d = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(dateString);
 
     assertEquals("Value wasn't matched properly", d, mappedValue);
     assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
@@ -130,7 +129,7 @@ public class MapperDateTest {
     LOG.info("testMapperDate_invalidDateStringValue()");
 
     Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", LogFeederUtil.DATE_FORMAT);
+    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
 
     MapperDate mapperDate = new MapperDate();
     assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
@@ -149,7 +148,7 @@ public class MapperDateTest {
     String fieldName = "logtime";
     Calendar currentCalendar = Calendar.getInstance();
     Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", LogFeederUtil.DATE_FORMAT);
+    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
     String srcDatePattern ="MMM dd HH:mm:ss";
     mapConfigs.put("src_date_pattern", srcDatePattern);
     MapperDate mapperDate = new MapperDate();
@@ -160,7 +159,7 @@ public class MapperDateTest {
     nextMonthCalendar.set(Calendar.MONTH, currentCalendar.get(Calendar.MONTH)+1 );
     String inputDateStr = new SimpleDateFormat("MMM").format(nextMonthCalendar.getTime()) + " 01 12:01:45";
     Object mappedValue = mapperDate.apply(jsonObj, inputDateStr);
-    Date mappedDateValue = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).parse(mappedValue.toString());
+    Date mappedDateValue = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(mappedValue.toString());
     String mappedDateValueStr = new SimpleDateFormat(srcDatePattern).format(mappedDateValue);
     assertEquals(Date.class, mappedDateValue.getClass());
     
@@ -179,7 +178,7 @@ public class MapperDateTest {
     String fieldName = "logtime";
     Calendar currentCalendar = Calendar.getInstance();
     Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", LogFeederUtil.DATE_FORMAT);
+    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
     String srcDatePattern ="MMM dd HH:mm:ss";
     mapConfigs.put("src_date_pattern", srcDatePattern);
     MapperDate mapperDate = new MapperDate();
@@ -187,7 +186,7 @@ public class MapperDateTest {
     Map<String, Object> jsonObj = new HashMap<>();
     String inputDateStr = new SimpleDateFormat("MMM").format(currentCalendar.getTime()) + " 01 12:01:45";
     Object mappedValue = mapperDate.apply(jsonObj, inputDateStr);
-    Date mappedDateValue = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).parse(mappedValue.toString());
+    Date mappedDateValue = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(mappedValue.toString());
     String mappedDateValueStr = new SimpleDateFormat(srcDatePattern).format(mappedDateValue);
     assertEquals(Date.class, mappedDateValue.getClass());
     int expectedLogYear = currentCalendar.get(Calendar.YEAR);

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
index 6edf766..8ecaad1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
index df84b8e..fce4308 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
new file mode 100644
index 0000000..8ee6d00
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MetrcisManagerTest {
+
+  private MetricsManager manager;
+  private LogFeederAMSClient mockClient;
+  private Capture<TimelineMetrics> capture;
+  
+  @BeforeClass
+  public static void loadProperties() throws Exception {
+    LogFeederUtil.loadProperties("logfeeder.properties", null);
+  }
+  
+  @Before
+  public void init() throws Exception {
+    manager = new MetricsManager();
+    manager.init();
+    
+    mockClient = strictMock(LogFeederAMSClient.class);
+    Field f = MetricsManager.class.getDeclaredField("amsClient");
+    f.setAccessible(true);
+    f.set(manager, mockClient);
+    
+    capture = EasyMock.newCapture(CaptureType.FIRST);
+    mockClient.emitMetrics(EasyMock.capture(capture));
+    EasyMock.expectLastCall().andReturn(true).once();
+    
+    replay(mockClient);
+  }
+  
+  @Test
+  public void testMetricManager_pointInTime() throws Exception {
+    MetricData metricCount1 = new MetricData("metric1", true);
+    metricCount1.value = 123;
+    metricCount1.prevPublishValue = 0;
+    metricCount1.publishCount = 0;
+    
+    manager.useMetrics(Arrays.asList(metricCount1));
+    
+    verify(mockClient);
+    
+    TimelineMetrics metrics = capture.getValue();
+    List<TimelineMetric> metricList = metrics.getMetrics();
+    assertEquals(metricList.size(), 1);
+    
+    TimelineMetric metric = metricList.get(0);
+    assertEquals(metric.getHostName(), "test_host_name");
+    assertEquals(metric.getAppId(), "logfeeder");
+    assertEquals(metric.getMetricName(), "metric1");
+    assertEquals(metric.getType(), "Long");
+    
+    TreeMap<Long, Double> values = metric.getMetricValues();
+    assertEquals(values.size(), 1);
+    assertEquals(values.firstEntry().getValue(), Double.valueOf(123.0));
+  }
+  
+  @Test
+  public void testMetricManager_notPointInTime() throws Exception {
+    MetricData metricCount1 = new MetricData("metric1", false);
+    metricCount1.value = 123;
+    metricCount1.prevPublishValue = 0;
+    metricCount1.publishCount = 0;
+    
+    MetricData metricCount2 = new MetricData("metric1", false);
+    metricCount2.value = 123;
+    metricCount2.prevPublishValue = 100;
+    metricCount2.publishCount = 0;
+    
+    MetricData metricCount3 = new MetricData("metric1", false); // not included due to decrease of count
+    metricCount3.value = 99;
+    metricCount3.prevPublishValue = 100;
+    metricCount3.publishCount = 1;
+    
+    manager.useMetrics(Arrays.asList(metricCount1, metricCount2, metricCount3));
+    
+    verify(mockClient);
+    
+    TimelineMetrics metrics = capture.getValue();
+    List<TimelineMetric> metricList = metrics.getMetrics();
+    assertEquals(metricList.size(), 1);
+    
+    TimelineMetric metric = metricList.get(0);
+    assertEquals(metric.getHostName(), "test_host_name");
+    assertEquals(metric.getAppId(), "logfeeder");
+    assertEquals(metric.getMetricName(), "metric1");
+    assertEquals(metric.getType(), "Long");
+    
+    TreeMap<Long, Double> values = metric.getMetricValues();
+    assertEquals(values.size(), 1);
+    assertEquals(values.firstEntry().getValue(), Double.valueOf(146.0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
index a7db3f8..38d4b8b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -85,8 +85,7 @@ public class OutputKafkaTest {
     EasyMock.replay(mockKafkaProducer);
 
     for (int i = 0; i < 10; i++) {
-      InputMarker inputMarker = new InputMarker();
-      inputMarker.input = EasyMock.mock(Input.class);
+      InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
       outputKafka.write("value" + i, inputMarker);
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
new file mode 100644
index 0000000..e103346
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.junit.Test;
+
+public class OutputManagerTest {
+
+  @Test
+  public void testOutputManager_addAndRemoveOutputs() {
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    Output output4 = strictMock(Output.class);
+    
+    replay(output1, output2, output3, output4);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.retainUsedOutputs(Arrays.asList(output1, output2, output4));
+    
+    verify(output1, output2, output3, output4);
+    
+    List<Output> outputs = manager.getOutputs();
+    assertEquals(outputs.size(), 2);
+    assertEquals(outputs.get(0), output1);
+    assertEquals(outputs.get(1), output2);
+  }
+
+  @Test
+  public void testOutputManager_init() throws Exception {
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    output1.init(); expectLastCall();
+    output2.init(); expectLastCall();
+    output3.init(); expectLastCall();
+    
+    replay(output1, output2, output3);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.init();
+    
+    verify(output1, output2, output3);
+  }
+
+  @Test
+  public void testOutputManager_write() throws Exception {
+    Map<String, Object> jsonObj = new HashMap<>();
+    jsonObj.put("type", "testType");
+    jsonObj.put("path", "testPath");
+    jsonObj.put("host", "testHost");
+    jsonObj.put("ip", "testIp");
+    jsonObj.put("level", "testLevel");
+    jsonObj.put("id", "testId");
+    
+    Input mockInput = strictMock(Input.class);
+    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    expect(mockInput.getContextFields()).andReturn(Collections.<String, String> emptyMap());
+    expect(mockInput.isUseEventMD5()).andReturn(false);
+    expect(mockInput.isGenEventMD5()).andReturn(false);
+    expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
+    
+    output1.write(jsonObj, inputMarker); expectLastCall();
+    output2.write(jsonObj, inputMarker); expectLastCall();
+    output3.write(jsonObj, inputMarker); expectLastCall();
+    
+    replay(output1, output2, output3, mockInput);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.write(jsonObj, inputMarker);
+    
+    verify(output1, output2, output3, mockInput);
+  }
+
+  @Test
+  public void testOutputManager_write2() throws Exception {
+    String jsonString = "{}";
+    
+    Input mockInput = strictMock(Input.class);
+    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
+    
+    output1.write(jsonString, inputMarker); expectLastCall();
+    output2.write(jsonString, inputMarker); expectLastCall();
+    output3.write(jsonString, inputMarker); expectLastCall();
+    
+    replay(output1, output2, output3, mockInput);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.write(jsonString, inputMarker);
+    
+    verify(output1, output2, output3, mockInput);
+  }
+
+  @Test
+  public void testOutputManager_addMetricsContainers() throws Exception {
+    List<MetricData> metrics = new ArrayList<MetricData>();
+    
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    output1.addMetricsContainers(metrics); expectLastCall();
+    output2.addMetricsContainers(metrics); expectLastCall();
+    output3.addMetricsContainers(metrics); expectLastCall();
+    
+    replay(output1, output2, output3);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.addMetricsContainers(metrics);
+    
+    verify(output1, output2, output3);
+  }
+
+  @Test
+  public void testOutputManager_logStat() throws Exception {
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    output1.logStat(); expectLastCall();
+    output2.logStat(); expectLastCall();
+    output3.logStat(); expectLastCall();
+    
+    replay(output1, output2, output3);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.logStats();
+    
+    verify(output1, output2, output3);
+  }
+
+  @Test
+  public void testOutputManager_copyFile() throws Exception {
+    File f = new File("");
+    
+    Input mockInput = strictMock(Input.class);
+    InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
+    
+    output1.copyFile(f, inputMarker); expectLastCall();
+    output2.copyFile(f, inputMarker); expectLastCall();
+    output3.copyFile(f, inputMarker); expectLastCall();
+    
+    replay(output1, output2, output3, mockInput);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.copyFile(f, inputMarker);
+    
+    verify(output1, output2, output3, mockInput);
+  }
+
+  @Test
+  public void testOutputManager_close() throws Exception {
+    Output output1 = strictMock(Output.class);
+    Output output2 = strictMock(Output.class);
+    Output output3 = strictMock(Output.class);
+    
+    output1.setDrain(true); expectLastCall();
+    output2.setDrain(true); expectLastCall();
+    output3.setDrain(true); expectLastCall();
+    
+    output1.close(); expectLastCall();
+    output2.close(); expectLastCall();
+    output3.close(); expectLastCall();
+    
+    expect(output1.isClosed()).andReturn(true);
+    expect(output2.isClosed()).andReturn(true);
+    expect(output3.isClosed()).andReturn(true);
+    
+    replay(output1, output2, output3);
+    
+    OutputManager manager = new OutputManager();
+    manager.add(output1);
+    manager.add(output2);
+    manager.add(output3);
+    
+    manager.close();
+    
+    verify(output1, output2, output3);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 20a4f1f..1872135 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -69,15 +69,14 @@ public class OutputS3FileTest {
   @Test
   public void shouldSpoolLogEventToNewSpooler() throws Exception {
 
-    InputMarker inputMarker = mock(InputMarker.class);
     Input input = mock(Input.class);
-    inputMarker.input = input;
+    InputMarker inputMarker = new InputMarker(input, null, 0);
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
     expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block");
     final S3Uploader s3Uploader = mock(S3Uploader.class);
-    replay(input, inputMarker, spooler, s3Uploader);
+    replay(input, spooler, s3Uploader);
 
     OutputS3File outputS3File = new OutputS3File() {
       @Override
@@ -98,16 +97,15 @@ public class OutputS3FileTest {
 
   @Test
   public void shouldReuseSpoolerForSamePath() throws Exception {
-    InputMarker inputMarker = mock(InputMarker.class);
     Input input = mock(Input.class);
-    inputMarker.input = input;
+    InputMarker inputMarker = new InputMarker(input, null, 0);
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
     expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block1");
     spooler.add("log event block2");
     final S3Uploader s3Uploader = mock(S3Uploader.class);
-    replay(input, inputMarker, spooler, s3Uploader);
+    replay(input, spooler, s3Uploader);
 
     OutputS3File outputS3File = new OutputS3File() {
       private boolean firstCallComplete;
@@ -169,16 +167,15 @@ public class OutputS3FileTest {
 
   @Test
   public void shouldUploadFileOnRollover() throws Exception {
-    InputMarker inputMarker = mock(InputMarker.class);
     Input input = mock(Input.class);
-    inputMarker.input = input;
+    InputMarker inputMarker = new InputMarker(input, null, 0);
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
     expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block1");
     final S3Uploader s3Uploader = mock(S3Uploader.class);
     s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz");
-    replay(input, inputMarker, spooler, s3Uploader);
+    replay(input, spooler, s3Uploader);
 
     OutputS3File outputS3File = new OutputS3File() {
       @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index 33bb33f..8985110 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -97,8 +97,7 @@ public class OutputSolrTest {
         jsonObj.put("name" + ++count, "value" + ++count);
       jsonObj.put("id", ++count);
 
-      InputMarker inputMarker = new InputMarker();
-      inputMarker.input = EasyMock.mock(Input.class);
+      InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
       outputSolr.write(jsonObj, inputMarker);
 
       SolrInputDocument doc = new SolrInputDocument();

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
index cc6da56..d1376c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
index c64e0c5..5477f5c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,7 +18,6 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.util.S3Util;
 import org.junit.Test;
 
 import java.io.File;
@@ -46,22 +45,20 @@ public class S3UploaderTest {
     Map<String, Object> configs = setupS3Configs();
 
     S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    S3Util s3Util = mock(S3Util.class);
-    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
-    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
     expect(compressedFile.delete()).andReturn(true);
     expect(fileToUpload.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile, s3Util);
+    replay(fileToUpload, compressedFile);
 
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
       @Override
       protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
         return compressedFile;
       }
+      protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+      }
     };
     String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
 
-    verify(s3Util);
     assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
   }
 
@@ -74,18 +71,17 @@ public class S3UploaderTest {
     Map<String, Object> configs = setupS3Configs();
 
     S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    S3Util s3Util = mock(S3Util.class);
-    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
-    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
     expect(compressedFile.delete()).andReturn(true);
     expect(fileToUpload.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile, s3Util);
+    replay(fileToUpload, compressedFile);
 
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
       @Override
       protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
         return compressedFile;
       }
+      protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+      }
     };
     s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
 
@@ -102,17 +98,16 @@ public class S3UploaderTest {
     Map<String, Object> configs = setupS3Configs();
 
     S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    S3Util s3Util = mock(S3Util.class);
-    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
-    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
     expect(compressedFile.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile, s3Util);
+    replay(fileToUpload, compressedFile);
 
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, false, LOG_TYPE) {
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, LOG_TYPE) {
       @Override
       protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
         return compressedFile;
       }
+      protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+      }
     };
     s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
 
@@ -131,22 +126,19 @@ public class S3UploaderTest {
 
 
     S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    S3Util s3Util = mock(S3Util.class);
-    String s3Key = String.format("%s/%s/%s/%s.%s", "cl1", TEST_PATH, LOG_TYPE, fileName, GZ);
-    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
     expect(compressedFile.delete()).andReturn(true);
     expect(fileToUpload.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile, s3Util);
+    replay(fileToUpload, compressedFile);
 
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
       @Override
       protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
         return compressedFile;
       }
+      protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+      }
     };
     s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
-    verify(s3Util);
   }
 
   private Map<String, Object> setupS3Configs() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
index 7a47039..2cfe9ff 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
deleted file mode 100644
index 6df2283..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.util;
-
-import org.apache.ambari.logfeeder.util.AWSUtil;
-
-public class AWSUtilTest {
-  public void testAWSUtil_getAwsUserName() throws Exception {
-    String S3_ACCESS_KEY = "S3_ACCESS_KEY";
-    String S3_SECRET_KEY = "S3_SECRET_KEY";
-    AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
index 9789a14..43e03c7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
@@ -1,6 +1,4 @@
-package org.apache.ambari.logfeeder.util;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +15,9 @@ package org.apache.ambari.logfeeder.util;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+package org.apache.ambari.logfeeder.util;
+
 import java.util.HashMap;
 
 import org.junit.Test;
@@ -26,19 +27,18 @@ import static org.junit.Assert.assertEquals;
 public class PlaceholderUtilTest {
   @Test
   public void testPlaceholderUtil_replaceVariables() {
-    HashMap<String, String> contextParam = new HashMap<String, String>();
     String hostName = "host1";
     String ip = "127.0.0.1";
     String clusterName = "test-cluster";
+    
+    HashMap<String, String> contextParam = new HashMap<String, String>();
     contextParam.put("host", hostName);
     contextParam.put("ip", ip);
     contextParam.put("cluster", clusterName);
-    String inputStr = "$CLUSTER/logfeeder/$HOST-$IP/logs";
-    String resultStr = PlaceholderUtil.replaceVariables(inputStr, contextParam);
+    
+    String resultStr = PlaceholderUtil.replaceVariables("$CLUSTER/logfeeder/$HOST-$IP/logs", contextParam);
     String expectedStr = clusterName + "/logfeeder/" + hostName + "-" + ip + "/logs";
-    assertEquals("Result string :" + resultStr
-        + " is not equal to exptected string :" + expectedStr, resultStr,
-        expectedStr);
+    
+    assertEquals("Result string :" + resultStr + " is not equal to exptected string :" + expectedStr, resultStr, expectedStr);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
index 84554b0..02918be 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
@@ -26,14 +26,14 @@ public class S3UtilTest {
   public void testS3Util_pathToBucketName() throws Exception {
     String s3Path = "s3://bucket_name/path/file.txt";
     String expectedBucketName = "bucket_name";
-    String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
+    String actualBucketName = S3Util.getBucketName(s3Path);
     assertEquals(expectedBucketName, actualBucketName);
   }
 
   public void testS3Util_pathToS3Key() throws Exception {
     String s3Path = "s3://bucket_name/path/file.txt";
     String expectedS3key = "path/file.txt";
-    String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
+    String actualS3key = S3Util.getS3Key(s3Path);
     assertEquals(expectedS3key, actualS3key);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
new file mode 100644
index 0000000..59020cc
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/logfeeder.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+logfeeder.log.filter.enable=true
+logfeeder.solr.config.interval=5
+logfeeder.solr.zk_connect_string=some_connect_string
+logfeeder.metrics.collector.hosts=some_collector_host
+node.hostname=test_host_name
\ No newline at end of file