You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/08 22:50:55 UTC

[3/9] flume git commit: FLUME-2941. Integrate checkstyle for test classes

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
index 70d2c1b..724f093 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
@@ -67,8 +67,7 @@ public class ThriftTestingSource {
 
     @Override
     public Status append(ThriftFlumeEvent event) throws TException {
-      flumeEvents.add(EventBuilder.withBody(event.getBody(),
-        event.getHeaders()));
+      flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
       individualCount++;
       return Status.OK;
     }
@@ -80,8 +79,7 @@ public class ThriftTestingSource {
         incompleteBatches++;
       }
       for (ThriftFlumeEvent event : events) {
-        flumeEvents.add(EventBuilder.withBody(event.getBody(),
-          event.getHeaders()));
+        flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
       }
       return Status.OK;
     }
@@ -175,8 +173,7 @@ public class ThriftTestingSource {
     }
 
     @Override
-    public Status appendBatch(List<ThriftFlumeEvent> events) throws
-      TException {
+    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
       try {
         if (delay != null) {
           TimeUnit.MILLISECONDS.sleep(delay.get());
@@ -207,8 +204,8 @@ public class ThriftTestingSource {
   }
 
   public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception {
-    TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new
-      InetSocketAddress("0.0.0.0", port));
+    TNonblockingServerTransport serverTransport =
+        new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port));
     ThriftSourceProtocol.Iface handler = getHandler(handlerName);
 
     TProtocolFactory transportProtocolFactory = null;
@@ -217,10 +214,9 @@ public class ThriftTestingSource {
     } else {
       transportProtocolFactory = new TCompactProtocol.Factory();
     }
-    server = new THsHaServer(new THsHaServer.Args
-      (serverTransport).processor(
-      new ThriftSourceProtocol.Processor(handler)).protocolFactory(
-          transportProtocolFactory));
+    server = new THsHaServer(new THsHaServer.Args(serverTransport).processor(
+        new ThriftSourceProtocol.Processor(handler)).protocolFactory(
+            transportProtocolFactory));
     Executors.newSingleThreadExecutor().submit(new Runnable() {
       @Override
       public void run() {
@@ -260,10 +256,8 @@ public class ThriftTestingSource {
     args.protocolFactory(transportProtocolFactory);
     args.inputTransportFactory(new TFastFramedTransport.Factory());
     args.outputTransportFactory(new TFastFramedTransport.Factory());
-    args.processor(new ThriftSourceProtocol
-            .Processor<ThriftSourceProtocol.Iface>(handler));
-    server = (TServer) serverClass.getConstructor(argsClass).newInstance
-            (args);
+    args.processor(new ThriftSourceProtocol.Processor<ThriftSourceProtocol.Iface>(handler));
+    server = (TServer) serverClass.getConstructor(argsClass).newInstance(args);
     Executors.newSingleThreadExecutor().submit(new Runnable() {
       @Override
       public void run() {
@@ -285,5 +279,4 @@ public class ThriftTestingSource {
     server.stop();
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index 621920d..3709577 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -18,27 +18,12 @@
 
 package org.apache.flume.sink.kite;
 
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -57,6 +42,8 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.sink.kite.parser.EntityParser;
+import org.apache.flume.sink.kite.policy.FailurePolicy;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -74,7 +61,28 @@ import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.PartitionStrategy;
 import org.kitesdk.data.View;
-import static org.mockito.Mockito.*;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestDatasetSink {
 
@@ -206,7 +214,8 @@ public class TestDatasetSink {
   }
 
   @Test
-  public void testFileStore() throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException {
+  public void testFileStore()
+      throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException {
     DatasetSink sink = sink(in, config);
 
     // run the sink
@@ -307,31 +316,28 @@ public class TestDatasetSink {
     sink.process();
     sink.stop();
 
-    Assert.assertEquals(
-      Sets.newHashSet(expected),
-      read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals(Sets.newHashSet(expected), read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
   @Test
   public void testDatasetUpdate() throws EventDeliveryException {
     // add an updated record that is missing the msg field
-    GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(
-      UPDATED_SCHEMA);
+    GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(UPDATED_SCHEMA);
     GenericData.Record updatedRecord = updatedBuilder
-      .set("id", "0")
-      .set("priority", 1)
-      .set("msg", "Priority 1 message!")
-      .build();
+        .set("id", "0")
+        .set("priority", 1)
+        .set("msg", "Priority 1 message!")
+        .build();
 
     // make a set of the expected records with the new schema
     Set<GenericRecord> expectedAsUpdated = Sets.newHashSet();
     for (GenericRecord record : expected) {
       expectedAsUpdated.add(updatedBuilder
-        .clear("priority")
-        .set("id", record.get("id"))
-        .set("msg", record.get("msg"))
-        .build());
+          .clear("priority")
+          .set("id", record.get("id"))
+          .set("msg", record.get("msg"))
+          .build());
     }
     expectedAsUpdated.add(updatedRecord);
 
@@ -343,9 +349,9 @@ public class TestDatasetSink {
 
     // update the dataset's schema
     DatasetDescriptor updated = new DatasetDescriptor
-      .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
-      .schema(UPDATED_SCHEMA)
-      .build();
+        .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
+        .schema(UPDATED_SCHEMA)
+        .build();
     Datasets.update(FILE_DATASET_URI, updated);
 
     // trigger a roll on the next process call to refresh the writer
@@ -358,15 +364,12 @@ public class TestDatasetSink {
     sink.process();
     sink.stop();
 
-    Assert.assertEquals(
-      expectedAsUpdated,
-      read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals(expectedAsUpdated, read(Datasets.load(FILE_DATASET_URI)));
     Assert.assertEquals("Should have committed", 0, remaining(in));
   }
 
   @Test
-  public void testMiniClusterStore()
-      throws EventDeliveryException, IOException {
+  public void testMiniClusterStore() throws EventDeliveryException, IOException {
     // setup a minicluster
     MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(new Configuration())

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
index 9c1cd09..f1dadf1 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
@@ -19,25 +19,27 @@
 
 package org.apache.flume.sink.hdfs;
 
-import java.io.IOException;
-
 import org.apache.flume.Event;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
+import java.io.IOException;
+
 public class HDFSTestSeqWriter extends HDFSSequenceFile {
-  protected volatile boolean closed, opened;
+  protected volatile boolean closed;
+  protected volatile boolean opened;
 
   private int openCount = 0;
+
   HDFSTestSeqWriter(int openCount) {
     this.openCount = openCount;
   }
 
   @Override
-  public void open(String filePath, CompressionCodec codeC,
-      CompressionType compType) throws IOException {
+  public void open(String filePath, CompressionCodec codeC, CompressionType compType)
+      throws IOException {
     super.open(filePath, codeC, compType);
-    if(closed) {
+    if (closed) {
       opened = true;
     }
   }
@@ -52,7 +54,7 @@ public class HDFSTestSeqWriter extends HDFSSequenceFile {
       throw new IOException("Injected fault");
     } else if (e.getHeaders().containsKey("fault-until-reopen")) {
       // opening first time.
-      if(openCount == 1) {
+      if (openCount == 1) {
         throw new IOException("Injected fault-until-reopen");
       }
     } else if (e.getHeaders().containsKey("slow")) {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
index f0c6e7e..a85a99f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
@@ -30,9 +30,9 @@ class MockDataStream extends HDFSDataStream {
   MockDataStream(FileSystem fs) {
     this.fs = fs;
   }
+
   @Override
-  protected FileSystem getDfs(Configuration conf,
-    Path dstPath) throws IOException{
+  protected FileSystem getDfs(Configuration conf, Path dstPath) throws IOException {
     return fs;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
index 4443335..a079b83 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
@@ -42,8 +42,7 @@ public class MockFileSystem extends FileSystem {
   int currentRenameAttempts;
   boolean closeSucceed = true;
 
-  public MockFileSystem(FileSystem fs,
-    int numberOfRetriesRequired) {
+  public MockFileSystem(FileSystem fs, int numberOfRetriesRequired) {
     this.fs = fs;
     this.numberOfRetriesRequired = numberOfRetriesRequired;
   }
@@ -67,17 +66,14 @@ public class MockFileSystem extends FileSystem {
 
   @Override
   public FSDataOutputStream create(Path arg0) throws IOException {
-    //throw new IOException ("HI there2");
-    latestOutputStream = new MockFsDataOutputStream(
-      fs.create(arg0), closeSucceed);
-
+    latestOutputStream = new MockFsDataOutputStream(fs.create(arg0), closeSucceed);
     return latestOutputStream;
   }
 
   @Override
-  public FSDataOutputStream create(Path arg0, FsPermission arg1,
-    boolean arg2, int arg3, short arg4, long arg5, Progressable arg6)
-    throws IOException {
+  public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3,
+                                   short arg4, long arg5, Progressable arg6)
+      throws IOException {
     throw new IOException("Not a real file system");
   }
 
@@ -126,11 +122,9 @@ public class MockFileSystem extends FileSystem {
   @Override
   public boolean rename(Path arg0, Path arg1) throws IOException {
     currentRenameAttempts++;
-    logger.info(
-      "Attempting to Rename: '" + currentRenameAttempts + "' of '" +
-      numberOfRetriesRequired + "'");
-    if (currentRenameAttempts >= numberOfRetriesRequired ||
-      numberOfRetriesRequired == 0) {
+    logger.info("Attempting to Rename: '" + currentRenameAttempts + "' of '" +
+                numberOfRetriesRequired + "'");
+    if (currentRenameAttempts >= numberOfRetriesRequired || numberOfRetriesRequired == 0) {
       logger.info("Renaming file");
       return fs.rename(arg0, arg1);
     } else {
@@ -141,6 +135,5 @@ public class MockFileSystem extends FileSystem {
   @Override
   public void setWorkingDirectory(Path arg0) {
     fs.setWorkingDirectory(arg0);
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
index 35b034e..f5d579c 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
@@ -17,21 +17,20 @@
 + */
 package org.apache.flume.sink.hdfs;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MockFsDataOutputStream extends FSDataOutputStream{
+import java.io.IOException;
+
+public class MockFsDataOutputStream extends FSDataOutputStream {
 
   private static final Logger logger =
       LoggerFactory.getLogger(MockFsDataOutputStream.class);
 
   boolean closeSucceed;
 
-  public MockFsDataOutputStream(FSDataOutputStream wrapMe,
-    boolean closeSucceed)
+  public MockFsDataOutputStream(FSDataOutputStream wrapMe, boolean closeSucceed)
       throws IOException {
     super(wrapMe.getWrappedStream(), null);
     this.closeSucceed = closeSucceed;
@@ -39,8 +38,7 @@ public class MockFsDataOutputStream extends FSDataOutputStream{
 
   @Override
   public void close() throws IOException {
-    logger.info(
-      "Close Succeeded - " + closeSucceed);
+    logger.info("Close Succeeded - " + closeSucceed);
     if (closeSucceed) {
       logger.info("closing file");
       super.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index ec49b97..05c4316 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -68,7 +68,8 @@ public class MockHDFSWriter implements HDFSWriter {
     filesOpened++;
   }
 
-  public void open(String filePath, CompressionCodec codec, CompressionType cType) throws IOException {
+  public void open(String filePath, CompressionCodec codec, CompressionType cType)
+      throws IOException {
     this.filePath = filePath;
     filesOpened++;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 2581f73..742deb0 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -18,14 +18,7 @@
  */
 package org.apache.flume.sink.hdfs;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.base.Charsets;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -46,12 +39,17 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestBucketWriter {
 
-  private static Logger logger =
-      LoggerFactory.getLogger(TestBucketWriter.class);
+  private static Logger logger = LoggerFactory.getLogger(TestBucketWriter.class);
   private Context ctx = new Context();
 
   private static ScheduledExecutorService timedRollerPool;
@@ -74,11 +72,11 @@ public class TestBucketWriter {
   public void testEventCountingRoller() throws IOException, InterruptedException {
     int maxEvents = 100;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
-        "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
-        null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -98,12 +96,11 @@ public class TestBucketWriter {
   public void testSizeRoller() throws IOException, InterruptedException {
     int maxBytes = 300;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0,
-      ctx, "/tmp", "file", "", ".tmp", null, null,
-      SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool,
-      proxy, new SinkCounter("test-bucket-writer-" +
-      System.currentTimeMillis()),0, null, null, 30000,
-      Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -126,16 +123,16 @@ public class TestBucketWriter {
     final AtomicBoolean calledBack = new AtomicBoolean(false);
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-      "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, proxy,
-      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-      0, new HDFSEventSink.WriterCallback() {
-      @Override
-      public void run(String filePath) {
-        calledBack.set(true);
-      }
-    }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+        new HDFSEventSink.WriterCallback() {
+          @Override
+          public void run(String filePath) {
+            calledBack.set(true);
+          }
+        }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -149,12 +146,11 @@ public class TestBucketWriter {
     Assert.assertTrue(bucketWriter.closed);
     Assert.assertTrue(calledBack.get());
 
-    bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-      "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, proxy,
-      new SinkCounter("test-bucket-writer-"
-        + System.currentTimeMillis()), 0, null, null, 30000,
-      Executors.newSingleThreadExecutor(), 0, 0);
+    bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
     // write one more event (to reopen a new file so we will roll again later)
     bucketWriter.append(e);
 
@@ -193,17 +189,16 @@ public class TestBucketWriter {
       private volatile boolean open = false;
 
       public void configure(Context context) {
-
       }
 
       public void sync() throws IOException {
-        if(!open) {
+        if (!open) {
           throw new IOException("closed");
         }
       }
 
-      public void open(String filePath, CompressionCodec codec,
-          CompressionType cType) throws IOException {
+      public void open(String filePath, CompressionCodec codec, CompressionType cType)
+          throws IOException {
         open = true;
       }
 
@@ -225,19 +220,18 @@ public class TestBucketWriter {
         open = true;
       }
     };
+
     HDFSTextSerializer serializer = new HDFSTextSerializer();
     File tmpFile = File.createTempFile("flume", "test");
     tmpFile.deleteOnExit();
     String path = tmpFile.getParent();
     String name = tmpFile.getName();
 
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
-      0, ctx, path, name, "", ".tmp", null, null,
-      SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
-      + System.currentTimeMillis()),
-      0, null, null, 30000, Executors.newSingleThreadExecutor(),
-      0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -252,62 +246,61 @@ public class TestBucketWriter {
 
   @Test
   public void testFileSuffixNotGiven() throws IOException, InterruptedException {
-      final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
-      final String suffix = null;
-
-      MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
-        0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
-        + System.currentTimeMillis()), 0, null, null, 30000,
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+    final String suffix = null;
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
         Executors.newSingleThreadExecutor(), 0, 0);
 
-      // Need to override system time use for test so we know what to expect
-      final long testTime = System.currentTimeMillis();
-      Clock testClock = new Clock() {
-          public long currentTimeMillis() {
-              return testTime;
-          }
-      };
-      bucketWriter.setClock(testClock);
+    // Need to override system time use for test so we know what to expect
+    final long testTime = System.currentTimeMillis();
+    Clock testClock = new Clock() {
+      public long currentTimeMillis() {
+        return testTime;
+      }
+    };
+    bucketWriter.setClock(testClock);
 
-      Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
-      bucketWriter.append(e);
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
 
-      Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + ".tmp"));
+    Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
+        Long.toString(testTime + 1) + ".tmp"));
   }
 
-    @Test
-    public void testFileSuffixGiven() throws IOException, InterruptedException {
-        final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
-        final String suffix = ".avro";
+  @Test
+  public void testFileSuffixGiven() throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+    final String suffix = ".avro";
 
-      MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
-        0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, proxy, new SinkCounter(
-        "test-bucket-writer-" + System.currentTimeMillis()), 0,
-        null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
-        // Need to override system time use for test so we know what to expect
+    // Need to override system time use for test so we know what to expect
 
-        final long testTime = System.currentTimeMillis();
+    final long testTime = System.currentTimeMillis();
 
-        Clock testClock = new Clock() {
-            public long currentTimeMillis() {
-                return testTime;
-            }
-        };
-        bucketWriter.setClock(testClock);
+    Clock testClock = new Clock() {
+      public long currentTimeMillis() {
+        return testTime;
+      }
+    };
+    bucketWriter.setClock(testClock);
 
-        Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
-        bucketWriter.append(e);
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
 
-        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
-          Long.toString(testTime + 1) + suffix + ".tmp"));
-    }
+    Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath().endsWith(
+        Long.toString(testTime + 1) + suffix + ".tmp"));
+  }
 
   @Test
   public void testFileSuffixCompressed()
@@ -316,13 +309,11 @@ public class TestBucketWriter {
     final String suffix = ".foo";
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
-      0, ctx, "/tmp", "file", "", ".tmp", suffix,
-      HDFSEventSink.getCodec("gzip"),
-      SequenceFile.CompressionType.BLOCK, hdfsWriter,
-      timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
-      + System.currentTimeMillis()), 0, null, null, 30000,
-      Executors.newSingleThreadExecutor(), 0, 0
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix,
+        HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter,
+        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0
     );
 
     // Need to override system time use for test so we know what to expect
@@ -338,8 +329,8 @@ public class TestBucketWriter {
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
 
-    Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath()
-        .endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
+    Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
+        Long.toString(testTime + 1) + suffix + ".tmp"));
   }
 
   @Test
@@ -349,12 +340,11 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer formatter = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
-      0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
-      SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, proxy, new SinkCounter(
-        "test-bucket-writer-" + System.currentTimeMillis()), 0,
-      null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -369,12 +359,11 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer serializer = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
-      0, ctx, "/tmp", "file", "", SUFFIX, null, null,
-      SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, proxy, new SinkCounter(
-        "test-bucket-writer-" + System.currentTimeMillis()), 0,
-      null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -389,18 +378,16 @@ public class TestBucketWriter {
     final AtomicBoolean callbackCalled = new AtomicBoolean(false);
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
-      0, ctx, "/tmp", "file", "", SUFFIX, null, null,
-      SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, proxy,
-      new SinkCounter(
-        "test-bucket-writer-" + System.currentTimeMillis()), 0,
-      new HDFSEventSink.WriterCallback() {
-      @Override
-      public void run(String filePath) {
-        callbackCalled.set(true);
-      }
-    }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+        new HDFSEventSink.WriterCallback() {
+          @Override
+          public void run(String filePath) {
+            callbackCalled.set(true);
+          }
+        }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -420,13 +407,13 @@ public class TestBucketWriter {
     SequenceFileRenameRetryCoreTest(1, false);
     SequenceFileRenameRetryCoreTest(5, false);
     SequenceFileRenameRetryCoreTest(2, false);
-
   }
 
-  public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed) throws Exception {
-    String hdfsPath = "file:///tmp/flume-test."
-      + Calendar.getInstance().getTimeInMillis() + "."
-      + Thread.currentThread().getId();
+  public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed)
+      throws Exception {
+    String hdfsPath = "file:///tmp/flume-test." +
+                      Calendar.getInstance().getTimeInMillis() +
+                      "." + Thread.currentThread().getId();
 
     Context context = new Context();
     Configuration conf = new Configuration();
@@ -435,22 +422,16 @@ public class TestBucketWriter {
     fs.delete(dirPath, true);
     fs.mkdirs(dirPath);
     context.put("hdfs.path", hdfsPath);
-    context.put("hdfs.closeTries",
-      String.valueOf(numberOfRetriesRequired));
+    context.put("hdfs.closeTries", String.valueOf(numberOfRetriesRequired));
     context.put("hdfs.rollCount", "1");
     context.put("hdfs.retryInterval", "1");
     context.put("hdfs.callTimeout", Long.toString(1000));
-    MockFileSystem mockFs = new
-      MockFileSystem(fs,
-      numberOfRetriesRequired, closeSucceed);
-    BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
-      hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
-      null, new MockDataStream(mockFs),
-      timedRollerPool, proxy,
-      new SinkCounter(
-        "test-bucket-writer-" + System.currentTimeMillis()),
-      0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,
-      numberOfRetriesRequired);
+    MockFileSystem mockFs = new MockFileSystem(fs, numberOfRetriesRequired, closeSucceed);
+    BucketWriter bucketWriter = new BucketWriter(
+        0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
+        null, new MockDataStream(mockFs), timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 1, numberOfRetriesRequired);
 
     bucketWriter.setFileSystem(mockFs);
     // At this point, we checked if isFileClosed is available in
@@ -463,8 +444,7 @@ public class TestBucketWriter {
     TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2);
 
     Assert.assertTrue("Expected " + numberOfRetriesRequired + " " +
-      "but got " + bucketWriter.renameTries.get(),
-      bucketWriter.renameTries.get() ==
-        numberOfRetriesRequired);
+                      "but got " + bucketWriter.renameTries.get(),
+                      bucketWriter.renameTries.get() == numberOfRetriesRequired);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 23862eb..73f016b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 
-
 public class TestHDFSEventSink {
 
   private HDFSEventSink sink;
@@ -118,8 +117,7 @@ public class TestHDFSEventSink {
 
   @After
   public void tearDown() {
-    if (System.getenv("hdfs_keepFiles") == null)
-      dirCleanup();
+    if (System.getenv("hdfs_keepFiles") == null) dirCleanup();
   }
 
   @Test
@@ -176,7 +174,7 @@ public class TestHDFSEventSink {
     List<String> bodies = Lists.newArrayList();
 
     // push the event batches into channel to roll twice
-    for (i = 1; i <= (rollCount*10)/batchSize; i++) {
+    for (i = 1; i <= (rollCount * 10) / batchSize; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
       for (j = 1; j <= batchSize; j++) {
@@ -200,7 +198,7 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     long expectedFiles = totalEvents / rollCount;
@@ -353,7 +351,7 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     long expectedFiles = totalEvents / rollCount;
@@ -432,7 +430,7 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     long expectedFiles = totalEvents / rollCount;
@@ -508,7 +506,7 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     long expectedFiles = totalEvents / rollCount;
@@ -519,8 +517,8 @@ public class TestHDFSEventSink {
   }
 
   @Test
-  public void testSimpleAppendLocalTime() throws InterruptedException,
-    LifecycleException, EventDeliveryException, IOException {
+  public void testSimpleAppendLocalTime()
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
     final long currentTime = System.currentTimeMillis();
     Clock clk = new Clock() {
       @Override
@@ -536,7 +534,7 @@ public class TestHDFSEventSink {
     final int numBatches = 4;
     String newPath = testPath + "/singleBucket/%s" ;
     String expectedPath = testPath + "/singleBucket/" +
-      String.valueOf(currentTime/1000);
+        String.valueOf(currentTime / 1000);
     int totalEvents = 0;
     int i = 1, j = 1;
 
@@ -576,7 +574,7 @@ public class TestHDFSEventSink {
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
         event.getHeaders().put("timestamp",
-          String.valueOf(eventDate.getTimeInMillis()));
+            String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
         String body = "Test." + i + "." + j;
         event.setBody(body.getBytes());
@@ -595,13 +593,13 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     long expectedFiles = totalEvents / rollCount;
     if (totalEvents % rollCount > 0) expectedFiles++;
     Assert.assertEquals("num files wrong, found: " +
-      Lists.newArrayList(fList), expectedFiles, fList.length);
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
     // The clock in bucketpath is static, so restore the real clock
     sink.setBucketClock(new SystemClock());
@@ -750,10 +748,10 @@ public class TestHDFSEventSink {
   private List<String> getAllFiles(String input) {
     List<String> output = Lists.newArrayList();
     File dir = new File(input);
-    if(dir.isFile()) {
+    if (dir.isFile()) {
       output.add(dir.getAbsolutePath());
-    } else if(dir.isDirectory()) {
-      for(String file : dir.list()) {
+    } else if (dir.isDirectory()) {
+      for (String file : dir.list()) {
         File subDir = new File(dir, file);
         output.addAll(getAllFiles(subDir.getAbsolutePath()));
       }
@@ -761,16 +759,17 @@ public class TestHDFSEventSink {
     return output;
   }
 
-  private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+  private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir,
+                                         String prefix, List<String> bodies) throws IOException {
     int found = 0;
     int expected = bodies.size();
-    for(String outputFile : getAllFiles(dir)) {
+    for (String outputFile : getAllFiles(dir)) {
       String name = (new File(outputFile)).getName();
-      if(name.startsWith(prefix)) {
+      if (name.startsWith(prefix)) {
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(outputFile), conf);
         LongWritable key = new LongWritable();
         BytesWritable value = new BytesWritable();
-        while(reader.next(key, value)) {
+        while (reader.next(key, value)) {
           String body = new String(value.getBytes(), 0, value.getLength());
           if (bodies.contains(body)) {
             LOG.debug("Found event body: {}", body);
@@ -792,16 +791,17 @@ public class TestHDFSEventSink {
 
   }
 
-  private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+  private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix,
+                                     List<String> bodies) throws IOException {
     int found = 0;
     int expected = bodies.size();
-    for(String outputFile : getAllFiles(dir)) {
+    for (String outputFile : getAllFiles(dir)) {
       String name = (new File(outputFile)).getName();
-      if(name.startsWith(prefix)) {
+      if (name.startsWith(prefix)) {
         FSDataInputStream input = fs.open(new Path(outputFile));
         BufferedReader reader = new BufferedReader(new InputStreamReader(input));
         String body = null;
-        while((body = reader.readLine()) != null) {
+        while ((body = reader.readLine()) != null) {
           bodies.remove(body);
           found++;
         }
@@ -814,12 +814,13 @@ public class TestHDFSEventSink {
 
   }
 
-  private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+  private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix,
+                                     List<String> bodies) throws IOException {
     int found = 0;
     int expected = bodies.size();
-    for(String outputFile : getAllFiles(dir)) {
+    for (String outputFile : getAllFiles(dir)) {
       String name = (new File(outputFile)).getName();
-      if(name.startsWith(prefix)) {
+      if (name.startsWith(prefix)) {
         FSDataInputStream input = fs.open(new Path(outputFile));
         DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
         DataFileStream<GenericRecord> avroStream =
@@ -840,7 +841,7 @@ public class TestHDFSEventSink {
     }
     Assert.assertTrue("Found = " + found + ", Expected = "  +
         expected + ", Left = " + bodies.size() + " " + bodies,
-          bodies.size() == 0);
+            bodies.size() == 0);
   }
 
   /**
@@ -849,9 +850,9 @@ public class TestHDFSEventSink {
    * This relies on Transactional rollback semantics for durability and
    * the behavior of the BucketWriter class of close()ing upon IOException.
    */
- @Test
-  public void testCloseReopen() throws InterruptedException,
-      LifecycleException, EventDeliveryException, IOException {
+  @Test
+  public void testCloseReopen()
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
     final int numBatches = 4;
@@ -924,8 +925,8 @@ public class TestHDFSEventSink {
    * a new one is used for the next set of events.
    */
   @Test
-  public void testCloseReopenOnRollTime() throws InterruptedException,
-    LifecycleException, EventDeliveryException, IOException {
+  public void testCloseReopenOnRollTime()
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
     final int numBatches = 4;
@@ -973,7 +974,7 @@ public class TestHDFSEventSink {
           eventDate.clear();
           eventDate.set(2011, i, i, i, 0); // yy mm dd
           event.getHeaders().put("timestamp",
-            String.valueOf(eventDate.getTimeInMillis()));
+              String.valueOf(eventDate.getTimeInMillis()));
           event.getHeaders().put("hostname", "Host" + i);
           String body = "Test." + i + "." + j;
           event.setBody(body.getBytes());
@@ -997,9 +998,9 @@ public class TestHDFSEventSink {
 
     Assert.assertTrue(badWriterFactory.openCount.get() >= 2);
     LOG.info("Total number of bucket writers opened: {}",
-      badWriterFactory.openCount.get());
+        badWriterFactory.openCount.get());
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
-      bodies);
+        bodies);
   }
 
   /**
@@ -1007,8 +1008,8 @@ public class TestHDFSEventSink {
    * sfWriters map.
    */
   @Test
-  public void testCloseRemovesFromSFWriters() throws InterruptedException,
-    LifecycleException, EventDeliveryException, IOException {
+  public void testCloseRemovesFromSFWriters()
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
     final String fileName = "FlumeData";
@@ -1055,7 +1056,7 @@ public class TestHDFSEventSink {
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
         event.getHeaders().put("timestamp",
-          String.valueOf(eventDate.getTimeInMillis()));
+            String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
         String body = "Test." + i + "." + j;
         event.setBody(body.getBytes());
@@ -1080,9 +1081,9 @@ public class TestHDFSEventSink {
     sink.stop();
 
     LOG.info("Total number of bucket writers opened: {}",
-      badWriterFactory.openCount.get());
+        badWriterFactory.openCount.get());
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
-      bodies);
+        bodies);
   }
 
 
@@ -1163,8 +1164,9 @@ public class TestHDFSEventSink {
    * append using slow sink writer with specified append timeout
    * verify that the data is written correctly to files
    */
-  private void slowAppendTestHelper (long appendTimeout)  throws InterruptedException, IOException,
-  LifecycleException, EventDeliveryException, IOException {
+  private void slowAppendTestHelper(long appendTimeout)
+      throws InterruptedException, IOException, LifecycleException, EventDeliveryException,
+             IOException {
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -1230,7 +1232,7 @@ public class TestHDFSEventSink {
 
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
-    Path fList[] = FileUtil.stat2Paths(dirStat);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     // Note that we'll end up with two files with only a head
@@ -1292,7 +1294,7 @@ public class TestHDFSEventSink {
 
     Transaction txn = channel.getTransaction();
     txn.begin();
-    for(int i=0; i < 10; i++) {
+    for (int i = 0; i < 10; i++) {
       Event event = new SimpleEvent();
       event.setBody(("test event " + i).getBytes());
       channel.put(event);
@@ -1317,9 +1319,9 @@ public class TestHDFSEventSink {
     FileStatus[] dirStat = fs.listStatus(dirPath);
     Path[] fList = FileUtil.stat2Paths(dirStat);
     Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","),
-      2, fList.length);
+                        2, fList.length);
     Assert.assertTrue(!fList[0].getName().endsWith(".tmp") &&
-      !fList[1].getName().endsWith(".tmp"));
+                      !fList[1].getName().endsWith(".tmp"));
     fs.close();
   }
 
@@ -1338,8 +1340,7 @@ public class TestHDFSEventSink {
   }
 
   @Test
-  public void testBadConfigurationForRetryIntervalZero() throws
-    Exception {
+  public void testBadConfigurationForRetryIntervalZero() throws Exception {
     Context context = getContextForRetryTests();
     context.put("hdfs.retryInterval", "0");
 
@@ -1348,43 +1349,41 @@ public class TestHDFSEventSink {
   }
 
   @Test
-  public void testBadConfigurationForRetryIntervalNegative() throws
-    Exception {
+  public void testBadConfigurationForRetryIntervalNegative() throws Exception {
     Context context = getContextForRetryTests();
     context.put("hdfs.retryInterval", "-1");
 
     Configurables.configure(sink, context);
     Assert.assertEquals(1, sink.getTryCount());
   }
+
   @Test
-  public void testBadConfigurationForRetryCountZero() throws
-    Exception {
+  public void testBadConfigurationForRetryCountZero() throws Exception {
     Context context = getContextForRetryTests();
     context.put("hdfs.closeTries" ,"0");
 
     Configurables.configure(sink, context);
     Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
   }
+
   @Test
-  public void testBadConfigurationForRetryCountNegative() throws
-    Exception {
+  public void testBadConfigurationForRetryCountNegative() throws Exception {
     Context context = getContextForRetryTests();
     context.put("hdfs.closeTries" ,"-4");
 
     Configurables.configure(sink, context);
     Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
   }
+
   @Test
-  public void testRetryRename() throws InterruptedException,
-    LifecycleException,
-    EventDeliveryException, IOException {
+  public void testRetryRename()
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
     testRetryRename(true);
     testRetryRename(false);
   }
 
-  private void testRetryRename(boolean closeSucceed) throws InterruptedException,
-          LifecycleException,
-          EventDeliveryException, IOException {
+  private void testRetryRename(boolean closeSucceed)
+      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
     LOG.debug("Starting...");
     String newPath = testPath + "/retryBucket";
 
@@ -1441,8 +1440,8 @@ public class TestHDFSEventSink {
     Collection<BucketWriter> writers = sink.getSfWriters().values();
 
     int totalRenameAttempts = 0;
-    for(BucketWriter writer: writers) {
-      LOG.info("Rename tries = "+ writer.renameTries.get());
+    for (BucketWriter writer : writers) {
+      LOG.info("Rename tries = " + writer.renameTries.get());
       totalRenameAttempts += writer.renameTries.get();
     }
     // stop clears the sfWriters map, so we need to compute the

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
index 6381edc..974e857 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
@@ -47,8 +47,7 @@ public class TestSequenceFileSerializerFactory {
 
   @Test
   public void getCustomFormatter() {
-    SequenceFileSerializer formatter = SequenceFileSerializerFactory
-      .getSerializer(
+    SequenceFileSerializer formatter = SequenceFileSerializerFactory.getSerializer(
         "org.apache.flume.sink.hdfs.MyCustomSerializer$Builder", new Context());
 
     assertTrue(formatter != null);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
index 46724f2..c417404 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -55,8 +55,8 @@ import java.util.UUID;
 
 public class TestHiveSink {
   // 1)  partitioned table
-  final static String dbName = "testing";
-  final static String tblName = "alerts";
+  static final String dbName = "testing";
+  static final String tblName = "alerts";
 
   public static final String PART1_NAME = "continent";
   public static final String PART2_NAME = "country";
@@ -72,8 +72,8 @@ public class TestHiveSink {
   private final ArrayList<String> partitionVals;
 
   // 2) un-partitioned table
-  final static String dbName2 = "testing2";
-  final static String tblName2 = "alerts2";
+  static final String dbName2 = "testing2";
+  static final String tblName2 = "alerts2";
   final String[] colNames2 = {COL1,COL2};
   private String[] colTypes2 = { "int", "string" };
 
@@ -88,7 +88,6 @@ public class TestHiveSink {
   @Rule
   public TemporaryFolder dbFolder = new TemporaryFolder();
 
-
   private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
 
   public TestHiveSink() throws Exception {
@@ -182,8 +181,8 @@ public class TestHiveSink {
     TestUtil.dropDB(conf, dbName2);
     String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
     dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
-    TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2
-            , null, dbLocation);
+    TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2,
+                              null, dbLocation);
 
     try {
       int totalRecords = 4;
@@ -282,7 +281,7 @@ public class TestHiveSink {
       String body = j + ",blah,This is a log message,other stuff";
       event.setBody(body.getBytes());
       eventDate.clear();
-      eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm
+      eventDate.set(2014, 03, 03, j % batchCount, 1); // yy mm dd hh mm
       event.getHeaders().put( "timestamp",
               String.valueOf(eventDate.getTimeInMillis()) );
       event.getHeaders().put( PART1_NAME, "Asia" );
@@ -317,7 +316,7 @@ public class TestHiveSink {
           throws EventDeliveryException, IOException, CommandNeedRetryException {
     int batchSize = 2;
     int batchCount = 3;
-    int totalRecords = batchCount*batchSize;
+    int totalRecords = batchCount * batchSize;
     Context context = new Context();
     context.put("hive.metastore", metaStoreURI);
     context.put("hive.database", dbName);
@@ -340,7 +339,7 @@ public class TestHiveSink {
       txn.begin();
       for (int j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
-        String body = i*j + ",blah,This is a log message,other stuff";
+        String body = i * j + ",blah,This is a log message,other stuff";
         event.setBody(body.getBytes());
         bodies.add(body);
         channel.put(event);
@@ -361,7 +360,7 @@ public class TestHiveSink {
   public void testJsonSerializer() throws Exception {
     int batchSize = 2;
     int batchCount = 2;
-    int totalRecords = batchCount*batchSize;
+    int totalRecords = batchCount * batchSize;
     Context context = new Context();
     context.put("hive.metastore",metaStoreURI);
     context.put("hive.database",dbName);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
index 41bf0f6..4d7c9bb 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -42,8 +42,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class TestHiveWriter {
-  final static String dbName = "testing";
-  final static String tblName = "alerts";
+  static final String dbName = "testing";
+  static final String tblName = "alerts";
 
   public static final String PART1_NAME = "continent";
   public static final String PART2_NAME = "country";
@@ -106,8 +106,8 @@ public class TestHiveWriter {
     TestUtil.dropDB(conf, dbName);
     String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
     dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
-    TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes
-            , partNames, dbLocation);
+    TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes, partNames,
+                              dbLocation);
 
     // 2) Setup serializer
     Context ctx = new Context();
@@ -120,8 +120,8 @@ public class TestHiveWriter {
   public void testInstantiate() throws Exception {
     HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
     SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
-    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+                                       serializer, sinkCounter);
 
     writer.close();
   }
@@ -130,8 +130,8 @@ public class TestHiveWriter {
   public void testWriteBasic() throws Exception {
     HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
     SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
-    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+                                       serializer, sinkCounter);
 
     writeEvents(writer,3);
     writer.flush(false);
@@ -144,8 +144,8 @@ public class TestHiveWriter {
     HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
     SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
 
-    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+                                       serializer, sinkCounter);
 
     checkRecordCountInTable(0);
     SimpleEvent event = new SimpleEvent();
@@ -184,8 +184,8 @@ public class TestHiveWriter {
 
     int txnPerBatch = 3;
 
-    HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+    HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout, callTimeoutPool,
+                                       "flumetest", serializer, sinkCounter);
 
     Assert.assertEquals(writer.getRemainingTxns(),2);
     writer.flush(true);
@@ -275,14 +275,13 @@ public class TestHiveWriter {
     ctx.put("serializer.serdeSeparator", "ab");
     try {
       serializer3.configure(ctx);
-      Assert.assertTrue("Bad serdeSeparator character was accepted" ,false);
-    } catch (Exception e){
+      Assert.assertTrue("Bad serdeSeparator character was accepted", false);
+    } catch (Exception e) {
       // expect an exception
     }
 
   }
 
-
   @Test
   public void testSecondWriterBeforeFirstCommits() throws Exception {
     // here we open a new writer while the first is still writing (not committed)
@@ -295,13 +294,13 @@ public class TestHiveWriter {
     SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
     SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
 
-    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest",
+                                        serializer, sinkCounter1);
 
     writeEvents(writer1, 3);
 
-    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest",
+                                        serializer, sinkCounter2);
     writeEvents(writer2, 3);
     writer2.flush(false); // commit
 
@@ -311,7 +310,6 @@ public class TestHiveWriter {
     writer2.close();
   }
 
-
   @Test
   public void testSecondWriterAfterFirstCommits() throws Exception {
     // here we open a new writer after the first writer has committed one txn
@@ -324,16 +322,16 @@ public class TestHiveWriter {
     SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
     SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
 
-    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest",
+                                        serializer, sinkCounter1);
 
     writeEvents(writer1, 3);
 
     writer1.flush(false); // commit
 
 
-    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
-            , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest",
+                                        serializer, sinkCounter2);
     writeEvents(writer2, 3);
     writer2.flush(false); // commit
 
@@ -342,8 +340,8 @@ public class TestHiveWriter {
     writer2.close();
   }
 
-
-  private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException {
+  private void writeEvents(HiveWriter writer, int count)
+      throws InterruptedException, HiveWriter.WriteException {
     SimpleEvent event = new SimpleEvent();
     for (int i = 1; i <= count; i++) {
       event.setBody((i + ",xyz,Hello world,abc").getBytes());

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
index 107789f..1fcb4eb 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
@@ -46,7 +46,7 @@ import java.util.List;
 
 public class TestUtil {
 
-  private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+  private static final String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
 
   /**
    * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
@@ -81,7 +81,7 @@ public class TestUtil {
 
     runDDL(driver, crtTbl);
     System.out.println("crtTbl = " + crtTbl);
-    if (partNames!=null && partNames.length!=0) {
+    if (partNames != null && partNames.length != 0) {
       String addPart = "alter table " + tableName + " add partition ( " +
               getTablePartsStr2(partNames, partVals) + " )";
       runDDL(driver, addPart);
@@ -96,7 +96,8 @@ public class TestUtil {
   }
 
   // delete db and all tables in it
-  public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
+  public static void dropDB(HiveConf conf, String databaseName)
+      throws HiveException, MetaException {
     IMetaStoreClient client = new HiveMetaStoreClient(conf);
     try {
       for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
@@ -110,9 +111,9 @@ public class TestUtil {
 
   private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
     StringBuffer sb = new StringBuffer();
-    for (int i=0; i < colNames.length; ++i) {
+    for (int i = 0; i < colNames.length; ++i) {
       sb.append(colNames[i] + " " + colTypes[i]);
-      if (i<colNames.length-1) {
+      if (i < colNames.length - 1) {
         sb.append(",");
       }
     }
@@ -121,13 +122,13 @@ public class TestUtil {
 
   // converts partNames into "partName1 string, partName2 string"
   private static String getTablePartsStr(String[] partNames) {
-    if (partNames==null || partNames.length==0) {
+    if (partNames == null || partNames.length == 0) {
       return "";
     }
     StringBuffer sb = new StringBuffer();
-    for (int i=0; i < partNames.length; ++i) {
+    for (int i = 0; i < partNames.length; ++i) {
       sb.append(partNames[i] + " string");
-      if (i < partNames.length-1) {
+      if (i < partNames.length - 1) {
         sb.append(",");
       }
     }
@@ -137,9 +138,9 @@ public class TestUtil {
   // converts partNames,partVals into "partName1=val1, partName2=val2"
   private static String getTablePartsStr2(String[] partNames, List<String> partVals) {
     StringBuffer sb = new StringBuffer();
-    for (int i=0; i < partVals.size(); ++i) {
+    for (int i = 0; i < partVals.size(); ++i) {
       sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
-      if (i < partVals.size()-1) {
+      if (i < partVals.size() - 1) {
         sb.append(",");
       }
     }
@@ -147,7 +148,7 @@ public class TestUtil {
   }
 
   public static ArrayList<String> listRecordsInTable(Driver driver, String dbName, String tblName)
-          throws CommandNeedRetryException, IOException {
+      throws CommandNeedRetryException, IOException {
     driver.run("select * from " + dbName + "." + tblName);
     ArrayList<String> res = new ArrayList<String>();
     driver.getResults(res);
@@ -155,8 +156,9 @@ public class TestUtil {
   }
 
   public static ArrayList<String> listRecordsInPartition(Driver driver, String dbName,
-                               String tblName, String continent, String country)
-          throws CommandNeedRetryException, IOException {
+                                                         String tblName, String continent,
+                                                         String country)
+      throws CommandNeedRetryException, IOException {
     driver.run("select * from " + dbName + "." + tblName + " where continent='"
             + continent + "' and country='" + country + "'");
     ArrayList<String> res = new ArrayList<String>();
@@ -164,9 +166,9 @@ public class TestUtil {
     return res;
   }
 
-
   public static class RawFileSystem extends RawLocalFileSystem {
     private static final URI NAME;
+
     static {
       try {
         NAME = new URI("raw:///");
@@ -211,9 +213,10 @@ public class TestUtil {
               FsPermission.createImmutable(mod), "owen", "users", path);
     }
   }
+
   private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
     int retryCount = 1; // # of times to retry if first attempt fails
-    for (int attempt=0; attempt <= retryCount; ++attempt) {
+    for (int attempt = 0; attempt <= retryCount; ++attempt) {
       try {
         driver.run(sql);
         return true;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
index e6c065e..32517d1 100644
--- a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
+++ b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
@@ -19,7 +19,12 @@ package org.apache.flume.sink.irc;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
@@ -116,7 +121,9 @@ public class TestIRCSink {
           try {
             Socket socket = ss.accept();
             process(socket);
-          } catch (Exception ex) {/* noop */ }
+          } catch (Exception ex) {
+            /* noop */
+          }
         }
       } catch (IOException e) {
         // noop

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index f9272fa..9fbd747 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -18,17 +18,6 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Map;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -51,6 +40,17 @@ import org.joda.time.DateTimeUtils;
 import org.junit.After;
 import org.junit.Before;
 
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertEquals;
+
 public abstract class AbstractElasticSearchSinkTest {
 
   static final String DEFAULT_INDEX_NAME = "flume";
@@ -136,7 +136,8 @@ public abstract class AbstractElasticSearchSinkTest {
         .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
   }
 
-  void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, Event... events) {
+  void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody,
+                    Event... events) {
     SearchHits hitResponse = response.getHits();
     assertEquals(expectedHits, hitResponse.getTotalHits());
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
index 8022111..b62254e 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -18,15 +18,7 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.ComponentConfiguration;
@@ -39,7 +31,14 @@ import org.elasticsearch.common.io.FastByteArrayOutputStream;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestElasticSearchIndexRequestBuilderFactory
     extends AbstractElasticSearchSinkTest {
@@ -70,7 +69,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
   @Test
   public void indexNameShouldBePrefixDashFormattedTimestamp() {
     long millis = 987654321L;
-    assertEquals("prefix-"+factory.fastDateFormat.format(millis),
+    assertEquals("prefix-" + factory.fastDateFormat.format(millis),
         factory.getIndexName("prefix", millis));
   }
 
@@ -135,7 +134,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
 
     assertEquals(indexPrefix + '-'
         + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
-      indexRequestBuilder.request().index());
+        indexRequestBuilder.request().index());
     assertEquals(indexType, indexRequestBuilder.request().type());
     assertArrayEquals(FakeEventSerializer.FAKE_BYTES,
         indexRequestBuilder.request().source().array());
@@ -154,7 +153,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
 
     assertEquals(indexPrefix + '-'
         + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L),
-      indexRequestBuilder.request().index());
+        indexRequestBuilder.request().index());
   }
 
   @Test
@@ -174,7 +173,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
 
     assertEquals(indexValue + '-'
         + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
-      indexRequestBuilder.request().index());
+        indexRequestBuilder.request().index());
     assertEquals(typeValue, indexRequestBuilder.request().type());
   }
 
@@ -192,7 +191,8 @@ public class TestElasticSearchIndexRequestBuilderFactory
   static class FakeEventSerializer implements ElasticSearchEventSerializer {
 
     static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6};
-    boolean configuredWithContext, configuredWithComponentConfiguration;
+    boolean configuredWithContext;
+    boolean configuredWithComponentConfiguration;
 
     @Override
     public BytesStream getContentBuilder(Event event) throws IOException {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
index ab9587d..65b4dab 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
+import com.google.gson.JsonParser;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -28,9 +29,6 @@ import org.junit.Test;
 import java.util.Date;
 import java.util.Map;
 
-import com.google.gson.JsonParser;
-import com.google.gson.JsonElement;
-
 import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.junit.Assert.assertEquals;
@@ -56,26 +54,25 @@ public class TestElasticSearchLogStashEventSerializer {
     Event event = EventBuilder.withBody(message.getBytes(charset));
     event.setHeaders(headers);
 
-    XContentBuilder expected = jsonBuilder()
-        .startObject();
-            expected.field("@message", new String(message.getBytes(), charset));
-            expected.field("@timestamp", new Date(timestamp));
-            expected.field("@source", "flume_tail_src");
-            expected.field("@type", "sometype");
-            expected.field("@source_host", "test@localhost");
-            expected.field("@source_path", "/tmp/test");
-
-            expected.startObject("@fields");
-                expected.field("timestamp", String.valueOf(timestamp));
-                expected.field("src_path", "/tmp/test");
-                expected.field("host", "test@localhost");
-                expected.field("headerNameTwo", "headerValueTwo");
-                expected.field("source", "flume_tail_src");
-                expected.field("headerNameOne", "headerValueOne");
-                expected.field("type", "sometype");
-            expected.endObject();
-
-        expected.endObject();
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
 
     XContentBuilder actual = fixture.getContentBuilder(event);
     
@@ -102,26 +99,25 @@ public class TestElasticSearchLogStashEventSerializer {
     Event event = EventBuilder.withBody(message.getBytes(charset));
     event.setHeaders(headers);
 
-    XContentBuilder expected = jsonBuilder().
-        startObject();
-            expected.field("@message", new String(message.getBytes(), charset));
-            expected.field("@timestamp", new Date(timestamp));
-            expected.field("@source", "flume_tail_src");
-            expected.field("@type", "sometype");
-            expected.field("@source_host", "test@localhost");
-            expected.field("@source_path", "/tmp/test");
-
-            expected.startObject("@fields");
-                expected.field("timestamp", String.valueOf(timestamp));
-                expected.field("src_path", "/tmp/test");
-                expected.field("host", "test@localhost");
-                expected.field("headerNameTwo", "headerValueTwo");
-                expected.field("source", "flume_tail_src");
-                expected.field("headerNameOne", "headerValueOne");
-                expected.field("type", "sometype");
-            expected.endObject();
-
-        expected.endObject();
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
 
     XContentBuilder actual = fixture.getContentBuilder(event);