You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:28 UTC

[02/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index e659ada..4c8b52b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -18,15 +18,10 @@
  */
 package org.apache.flume.sink.hbase;
 
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.flume.Channel;
@@ -52,14 +47,16 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
-
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
 
 /**
- *
  * A simple sink which reads events from a channel and writes them to HBase.
  * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
  * encountered in the classpath. This sink supports batch reading of
@@ -73,7 +70,7 @@ import java.security.PrivilegedExceptionAction;
  * batch size, whichever comes first.<p>
  * Other optional parameters are:<p>
  * <tt>serializer:</tt> A class implementing {@link HbaseEventSerializer}.
- *  An instance of
+ * An instance of
  * this class will be used to write out events to hbase.<p>
  * <tt>serializer.*:</tt> Passed in the configure() method to serializer
  * as an object of {@link org.apache.flume.Context}.<p>
@@ -81,7 +78,7 @@ import java.security.PrivilegedExceptionAction;
  * maximum number of events the sink will commit per transaction. The default
  * batch size is 100 events.
  * <p>
- *
+ * <p>
  * <strong>Note: </strong> While this sink flushes all events in a transaction
  * to HBase in one shot, Hbase does not guarantee atomic commits on multiple
  * rows. So if a subset of events in a batch are written to disk by Hbase and
@@ -113,11 +110,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
   // Internal hooks used for unit testing.
   private DebugIncrementsCallback debugIncrCallback = null;
 
-  public HBaseSink(){
+  public HBaseSink() {
     this(HBaseConfiguration.create());
   }
 
-  public HBaseSink(Configuration conf){
+  public HBaseSink(Configuration conf) {
     this.config = conf;
   }
 
@@ -129,15 +126,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
   }
 
   @Override
-  public void start(){
+  public void start() {
     Preconditions.checkArgument(table == null, "Please call stop " +
         "before calling start on an old instance.");
     try {
-      privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
+      privilegedExecutor =
+          FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
     } catch (Exception ex) {
       sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException("Failed to login to HBase using "
-        + "provided credentials.", ex);
+          + "provided credentials.", ex);
     }
     try {
       table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() {
@@ -165,16 +163,16 @@ public class HBaseSink extends AbstractSink implements Configurable {
         }
       })) {
         throw new IOException("Table " + tableName
-                + " has no such column family " + Bytes.toString(columnFamily));
+            + " has no such column family " + Bytes.toString(columnFamily));
       }
     } catch (Exception e) {
       //Get getTableDescriptor also throws IOException, so catch the IOException
       //thrown above or by the getTableDescriptor() call.
       sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException("Error getting column family from HBase."
-              + "Please verify that the table " + tableName + " and Column Family, "
-              + Bytes.toString(columnFamily) + " exists in HBase, and the"
-              + " current user has permissions to access that table.", e);
+          + "Please verify that the table " + tableName + " and Column Family, "
+          + Bytes.toString(columnFamily) + " exists in HBase, and the"
+          + " current user has permissions to access that table.", e);
     }
 
     super.start();
@@ -183,7 +181,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   }
 
   @Override
-  public void stop(){
+  public void stop() {
     try {
       if (table != null) {
         table.close();
@@ -198,7 +196,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
 
   @SuppressWarnings("unchecked")
   @Override
-  public void configure(Context context){
+  public void configure(Context context) {
     tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE);
     String cf = context.getString(
         HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
@@ -213,48 +211,48 @@ public class HBaseSink extends AbstractSink implements Configurable {
     Preconditions.checkNotNull(cf,
         "Column family cannot be empty, please specify in configuration file");
     //Check foe event serializer, if null set event serializer type
-    if(eventSerializerType == null || eventSerializerType.isEmpty()) {
+    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
       eventSerializerType =
           "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer";
       logger.info("No serializer defined, Will use default");
     }
     serializerContext.putAll(context.getSubProperties(
-            HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
+        HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
     columnFamily = cf.getBytes(Charsets.UTF_8);
     try {
       Class<? extends HbaseEventSerializer> clazz =
           (Class<? extends HbaseEventSerializer>)
-          Class.forName(eventSerializerType);
+              Class.forName(eventSerializerType);
       serializer = clazz.newInstance();
       serializer.configure(serializerContext);
     } catch (Exception e) {
-      logger.error("Could not instantiate event serializer." , e);
+      logger.error("Could not instantiate event serializer.", e);
       Throwables.propagate(e);
     }
     kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB);
     kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL);
 
     enableWal = context.getBoolean(HBaseSinkConfigurationConstants
-      .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+        .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
     logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));
-    if(!enableWal) {
+    if (!enableWal) {
       logger.warn("HBase Sink's enableWal configuration is set to false. All " +
-        "writes to HBase will have WAL disabled, and any data in the " +
-        "memstore of this region in the Region Server could be lost!");
+          "writes to HBase will have WAL disabled, and any data in the " +
+          "memstore of this region in the Region Server could be lost!");
     }
 
     batchIncrements = context.getBoolean(
-      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+        HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
 
     if (batchIncrements) {
       logger.info("Increment coalescing is enabled. Increments will be " +
-        "buffered.");
+          "buffered.");
       refGetFamilyMap = reflectLookupGetFamilyMap();
     }
 
     String zkQuorum = context.getString(HBaseSinkConfigurationConstants
-      .ZK_QUORUM);
+        .ZK_QUORUM);
     Integer port = null;
     /**
      * HBase allows multiple nodes in the quorum, but all need to use the
@@ -267,10 +265,10 @@ public class HBaseSink extends AbstractSink implements Configurable {
       logger.info("Using ZK Quorum: " + zkQuorum);
       String[] zkHosts = zkQuorum.split(",");
       int length = zkHosts.length;
-      for(int i = 0; i < length; i++) {
+      for (int i = 0; i < length; i++) {
         String[] zkHostAndPort = zkHosts[i].split(":");
         zkBuilder.append(zkHostAndPort[0].trim());
-        if(i != length-1) {
+        if (i != length - 1) {
           zkBuilder.append(",");
         } else {
           zkQuorum = zkBuilder.toString();
@@ -282,18 +280,18 @@ public class HBaseSink extends AbstractSink implements Configurable {
           port = Integer.parseInt(zkHostAndPort[1].trim());
         } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
           throw new FlumeException("All Zookeeper nodes in the quorum must " +
-            "use the same client port.");
+              "use the same client port.");
         }
       }
-      if(port == null) {
+      if (port == null) {
         port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
       }
       this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
       this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
     }
     String hbaseZnode = context.getString(
-      HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
-    if(hbaseZnode != null && !hbaseZnode.isEmpty()) {
+        HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT);
+    if (hbaseZnode != null && !hbaseZnode.isEmpty()) {
       this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
     }
     sinkCounter = new SinkCounter(this.getName());
@@ -314,7 +312,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
       txn.begin();
 
       if (serializer instanceof BatchAware) {
-        ((BatchAware)serializer).onBatchStart();
+        ((BatchAware) serializer).onBatchStart();
       }
 
       long i = 0;
@@ -342,15 +340,15 @@ public class HBaseSink extends AbstractSink implements Configurable {
       putEventsAndCommit(actions, incs, txn);
 
     } catch (Throwable e) {
-      try{
+      try {
         txn.rollback();
       } catch (Exception e2) {
         logger.error("Exception in rollback. Rollback might not have been " +
-            "successful." , e2);
+            "successful.", e2);
       }
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
-      if(e instanceof Error || e instanceof RuntimeException){
+      if (e instanceof Error || e instanceof RuntimeException) {
         logger.error("Failed to commit transaction." +
             "Transaction rolled back.", e);
         Throwables.propagate(e);
@@ -367,7 +365,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   }
 
   private void putEventsAndCommit(final List<Row> actions,
-      final List<Increment> incs, Transaction txn) throws Exception {
+                                  final List<Increment> incs, Transaction txn) throws Exception {
 
     privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
       @Override
@@ -421,7 +419,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   @VisibleForTesting
   static Method reflectLookupGetFamilyMap() {
     Method m = null;
-    String[] methodNames = { "getFamilyMapOfLongs", "getFamilyMap" };
+    String[] methodNames = {"getFamilyMapOfLongs", "getFamilyMap"};
     for (String methodName : methodNames) {
       try {
         m = Increment.class.getMethod(methodName);
@@ -447,7 +445,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   @SuppressWarnings("unchecked")
   private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
     Preconditions.checkNotNull(refGetFamilyMap,
-                               "Increment.getFamilymap() not found");
+        "Increment.getFamilymap() not found");
     Preconditions.checkNotNull(inc, "Increment required");
     Map<byte[], NavigableMap<byte[], Long>> familyMap = null;
     try {
@@ -466,6 +464,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
   /**
    * Perform "compression" on the given set of increments so that Flume sends
    * the minimum possible number of RPC operations to HBase per batch.
+   *
    * @param incs Input: Increment objects to coalesce.
    * @return List of new Increment objects after coalescing the unique counts.
    */
@@ -478,7 +477,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
     for (Increment inc : incs) {
       byte[] row = inc.getRow();
       Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
-      for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) {
+      for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
         byte[] family = familyEntry.getKey();
         NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
         for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
@@ -491,9 +490,10 @@ public class HBaseSink extends AbstractSink implements Configurable {
 
     // Reconstruct list of Increments per unique row/family/qualifier.
     List<Increment> coalesced = Lists.newLinkedList();
-    for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) {
+    for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> rowEntry :
+         counters.entrySet()) {
       byte[] row = rowEntry.getKey();
-      Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
+      Map<byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
       Increment inc = new Increment(row);
       for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
         byte[] family = familyEntry.getKey();
@@ -513,11 +513,12 @@ public class HBaseSink extends AbstractSink implements Configurable {
   /**
    * Helper function for {@link #coalesceIncrements} to increment a counter
    * value in the passed data structure.
-   * @param counters Nested data structure containing the counters.
-   * @param row Row key to increment.
-   * @param family Column family to increment.
+   *
+   * @param counters  Nested data structure containing the counters.
+   * @param row       Row key to increment.
+   * @param family    Column family to increment.
    * @param qualifier Column qualifier to increment.
-   * @param count Amount to increment by.
+   * @param count     Amount to increment by.
    */
   private void incrementCounter(
       Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
index 2c0f0e6..d4e3f84 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java
@@ -32,13 +32,12 @@ import org.apache.hadoop.hbase.client.Row;
  * params required should be taken through this. Only the column family is
  * passed in. The columns should exist in the table and column family
  * specified in the configuration for the HbaseSink.
- *
  */
-public interface HbaseEventSerializer extends Configurable,
-    ConfigurableComponent {
+public interface HbaseEventSerializer extends Configurable, ConfigurableComponent {
   /**
    * Initialize the event serializer.
-   * @param Event to be written to HBase.
+   * @param event Event to be written to HBase
+   * @param columnFamily Column family to write to
    */
   public void initialize(Event event, byte[] columnFamily);
 
@@ -54,10 +53,9 @@ public interface HbaseEventSerializer extends Configurable,
   public List<Row> getActions();
 
   public List<Increment> getIncrements();
+
   /*
    * Clean up any state. This will be called when the sink is being stopped.
    */
   public void close();
-
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 7d2b8b7..8342d67 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -18,14 +18,8 @@
  */
 package org.apache.flume.sink.hbase;
 
-import java.nio.charset.Charset;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -35,20 +29,25 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
+import java.nio.charset.Charset;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
- * An {@link HbaseEventSerializer} which parses columns based on a supplied 
- * regular expression and column name list. 
- * 
+ * An {@link HbaseEventSerializer} which parses columns based on a supplied
+ * regular expression and column name list.
+ * <p>
  * Note that if the regular expression does not return the correct number of
  * groups for a particular event, or it does not correctly match an event,
  * the event is silently dropped.
- * 
+ * <p>
  * Row keys for each event consist of a timestamp concatenated with an
  * identifier which enforces uniqueness of keys across flume agents.
- * 
+ * <p>
  * See static constant variables for configuration options.
  */
 public class RegexHbaseEventSerializer implements HbaseEventSerializer {
@@ -108,21 +107,21 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
 
     String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
     String[] columnNames = colNameStr.split(",");
-    for (String s: columnNames) {
+    for (String s : columnNames) {
       colNames.add(s.getBytes(charset));
     }
 
     //Rowkey is optional, default is -1
     rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1);
     //if row key is being used, make sure it is specified correct
-    if(rowKeyIndex >=0){
-      if(rowKeyIndex >= columnNames.length) {
+    if (rowKeyIndex >= 0) {
+      if (rowKeyIndex >= columnNames.length) {
         throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " +
-          "less than num columns " + columnNames.length);
+            "less than num columns " + columnNames.length);
       }
-      if(!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
+      if (!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
         throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be "
-        + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
+            + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
       }
     }
   }
@@ -181,15 +180,15 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
     }
 
     try {
-      if(rowKeyIndex < 0){
+      if (rowKeyIndex < 0) {
         rowKey = getRowKey();
-      }else{
+      } else {
         rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
       }
       Put put = new Put(rowKey);
 
       for (int i = 0; i < colNames.size(); i++) {
-        if(i != rowKeyIndex) {
+        if (i != rowKeyIndex) {
           put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
         }
       }
@@ -211,5 +210,6 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
   }
 
   @Override
-  public void close() {  }
+  public void close() {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
index 96095d1..3f442e8 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
@@ -18,18 +18,17 @@
  */
 package org.apache.flume.sink.hbase;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Charsets;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
-import org.hbase.async.AtomicIncrementRequest;
-import org.hbase.async.PutRequest;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
+import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.PutRequest;
 
-import com.google.common.base.Charsets;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A simple serializer to be used with the AsyncHBaseSink
@@ -69,7 +68,7 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
   @Override
   public List<PutRequest> getActions() {
     List<PutRequest> actions = new ArrayList<PutRequest>();
-    if(payloadColumn != null){
+    if (payloadColumn != null) {
       byte[] rowKey;
       try {
         switch (keyType) {
@@ -89,17 +88,16 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
         PutRequest putRequest =  new PutRequest(table, rowKey, cf,
             payloadColumn, payload);
         actions.add(putRequest);
-      } catch (Exception e){
+      } catch (Exception e) {
         throw new FlumeException("Could not get row key!", e);
       }
     }
     return actions;
   }
 
-  public List<AtomicIncrementRequest> getIncrements(){
-    List<AtomicIncrementRequest> actions = new
-        ArrayList<AtomicIncrementRequest>();
-    if(incrementColumn != null) {
+  public List<AtomicIncrementRequest> getIncrements() {
+    List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
+    if (incrementColumn != null) {
       AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
           incrementRow, cf, incrementColumn);
       actions.add(inc);
@@ -119,23 +117,22 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
     String iCol = context.getString("incrementColumn", "iCol");
     rowPrefix = context.getString("rowPrefix", "default");
     String suffix = context.getString("suffix", "uuid");
-    if(pCol != null && !pCol.isEmpty()) {
-      if(suffix.equals("timestamp")){
+    if (pCol != null && !pCol.isEmpty()) {
+      if (suffix.equals("timestamp")) {
         keyType = KeyType.TS;
       } else if (suffix.equals("random")) {
         keyType = KeyType.RANDOM;
-      } else if(suffix.equals("nano")){
+      } else if (suffix.equals("nano")) {
         keyType = KeyType.TSNANO;
       } else {
         keyType = KeyType.UUID;
       }
       payloadColumn = pCol.getBytes(Charsets.UTF_8);
     }
-    if(iCol != null && !iCol.isEmpty()) {
+    if (iCol != null && !iCol.isEmpty()) {
       incrementColumn = iCol.getBytes(Charsets.UTF_8);
     }
-    incrementRow =
-        context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
+    incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
index 758252b..dc89fd7 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
@@ -19,9 +19,7 @@
 
 package org.apache.flume.sink.hbase;
 
-import java.util.LinkedList;
-import java.util.List;
-
+import com.google.common.base.Charsets;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -30,19 +28,18 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 
-import com.google.common.base.Charsets;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * A simple serializer that returns puts from an event, by writing the event
  * body into it. The headers are discarded. It also updates a row in hbase
  * which acts as an event counter.
- *
- * Takes optional parameters:<p>
+ * <p>Takes optional parameters:<p>
  * <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
  * <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
  * <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
- *
- * Mandatory parameters: <p>
+ * <p>Mandatory parameters: <p>
  * <tt>cf:</tt>Column family.<p>
  * Components that have no defaults and will not be used if null:
  * <tt>payloadColumn:</tt> Which column to put payload in. If it is null,
@@ -59,8 +56,7 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
   private KeyType keyType;
   private byte[] payload;
 
-  public SimpleHbaseEventSerializer(){
-
+  public SimpleHbaseEventSerializer() {
   }
 
   @Override
@@ -70,21 +66,21 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
         context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
     String suffix = context.getString("suffix", "uuid");
 
-    String payloadColumn = context.getString("payloadColumn","pCol");
-    String incColumn = context.getString("incrementColumn","iCol");
-    if(payloadColumn != null && !payloadColumn.isEmpty()) {
-      if(suffix.equals("timestamp")){
+    String payloadColumn = context.getString("payloadColumn", "pCol");
+    String incColumn = context.getString("incrementColumn", "iCol");
+    if (payloadColumn != null && !payloadColumn.isEmpty()) {
+      if (suffix.equals("timestamp")) {
         keyType = KeyType.TS;
       } else if (suffix.equals("random")) {
         keyType = KeyType.RANDOM;
-      } else if(suffix.equals("nano")){
+      } else if (suffix.equals("nano")) {
         keyType = KeyType.TSNANO;
       } else {
         keyType = KeyType.UUID;
       }
       plCol = payloadColumn.getBytes(Charsets.UTF_8);
     }
-    if(incColumn != null && !incColumn.isEmpty()) {
+    if (incColumn != null && !incColumn.isEmpty()) {
       incCol = incColumn.getBytes(Charsets.UTF_8);
     }
   }
@@ -102,14 +98,14 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
   @Override
   public List<Row> getActions() throws FlumeException {
     List<Row> actions = new LinkedList<Row>();
-    if(plCol != null){
+    if (plCol != null) {
       byte[] rowKey;
       try {
         if (keyType == KeyType.TS) {
           rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
-        } else if(keyType == KeyType.RANDOM) {
+        } else if (keyType == KeyType.RANDOM) {
           rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
-        } else if(keyType == KeyType.TSNANO) {
+        } else if (keyType == KeyType.TSNANO) {
           rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
         } else {
           rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
@@ -117,33 +113,34 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer {
         Put put = new Put(rowKey);
         put.add(cf, plCol, payload);
         actions.add(put);
-      } catch (Exception e){
+      } catch (Exception e) {
         throw new FlumeException("Could not get row key!", e);
       }
 
     }
     return actions;
   }
-    @Override
-    public List<Increment> getIncrements(){
-      List<Increment> incs = new LinkedList<Increment>();
-      if(incCol != null) {
-        Increment inc = new Increment(incrementRow);
-        inc.addColumn(cf, incCol, 1);
-        incs.add(inc);
-      }
-      return incs;
-    }
 
-    @Override
-    public void close() {
+  @Override
+  public List<Increment> getIncrements() {
+    List<Increment> incs = new LinkedList<Increment>();
+    if (incCol != null) {
+      Increment inc = new Increment(incrementRow);
+      inc.addColumn(cf, incCol, 1);
+      incs.add(inc);
     }
+    return incs;
+  }
 
-    public enum KeyType{
-      UUID,
-      RANDOM,
-      TS,
-      TSNANO;
-    }
+  @Override
+  public void close() {
+  }
 
+  public enum KeyType {
+    UUID,
+    RANDOM,
+    TS,
+    TSNANO;
   }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
index b25eb6a..2d654f2 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java
@@ -25,28 +25,22 @@ import java.util.UUID;
 /**
  * Utility class for users to generate their own keys. Any key can be used,
  * this is just a utility that provides a set of simple keys.
- *
- *
  */
 public class SimpleRowKeyGenerator {
 
-  public static byte[] getUUIDKey(String prefix)
-      throws UnsupportedEncodingException{
+  public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
     return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
   }
 
-  public static byte[] getRandomKey(String prefix)
-      throws UnsupportedEncodingException{
+  public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
     return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
   }
-  public static byte[] getTimestampKey(String prefix)
-      throws UnsupportedEncodingException {
-    return (prefix + String.valueOf(
-        System.currentTimeMillis())).getBytes("UTF8");
+
+  public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
   }
-  public static byte[] getNanoTimestampKey(String prefix)
-      throws UnsupportedEncodingException{
-    return (prefix + String.valueOf(
-        System.nanoTime())).getBytes("UTF8");
+
+  public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 7bef7f3..9453546 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -157,7 +157,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
         if (event == null) {
           // no events available in channel
-          if(processedEvents == 0) {
+          if (processedEvents == 0) {
             result = Status.BACKOFF;
             counter.incrementBatchEmptyCount();
           } else {
@@ -177,7 +177,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
         if (logger.isDebugEnabled()) {
           logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
-            + new String(eventBody, "UTF-8"));
+              + new String(eventBody, "UTF-8"));
           logger.debug("event #{}", processedEvents);
         }
 
@@ -185,8 +185,10 @@ public class KafkaSink extends AbstractSink implements Configurable {
         long startTime = System.currentTimeMillis();
 
         try {
-          kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)),
-                                         new SinkCallback(startTime)));
+          kafkaFutures.add(producer.send(
+              new ProducerRecord<String, byte[]>(eventTopic, eventKey,
+                                                 serializeEvent(event, useAvroEventFormat)),
+              new SinkCallback(startTime)));
         } catch (IOException ex) {
           throw new EventDeliveryException("Could not serialize event", ex);
         }
@@ -197,11 +199,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
       // publish batch and commit.
       if (processedEvents > 0) {
-          for (Future<RecordMetadata> future : kafkaFutures) {
-            future.get();
-          }
+        for (Future<RecordMetadata> future : kafkaFutures) {
+          future.get();
+        }
         long endTime = System.nanoTime();
-        counter.addToKafkaEventSendTimer((endTime-batchStartTime)/(1000*1000));
+        counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
         counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
       }
 
@@ -270,8 +272,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
       logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
-    }
-    else {
+    } else {
       logger.info("Using the static topic {}. This may be overridden by event headers", topicStr);
     }
 
@@ -283,7 +284,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
       logger.debug("Using batch size: {}", batchSize);
     }
 
-    useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT);
+    useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT,
+                                            KafkaSinkConstants.DEFAULT_AVRO_EVENT);
 
     if (logger.isDebugEnabled()) {
       logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
@@ -322,7 +324,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
         throw new ConfigurationException("Bootstrap Servers must be specified");
       } else {
         ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+        logger.warn("{} is deprecated. Please use the parameter {}",
+                    BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
       }
     }
 
@@ -348,21 +351,18 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
     if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
       logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
-              "a different interface for serializers. Please use the parameter {}",
-              KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+          "a different interface for serializers. Please use the parameter {}",
+          KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
     }
 
     if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
       logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
-                      "a different interface for serializers. Please use the parameter {}",
-              MESSAGE_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+                  "a different interface for serializers. Please use the parameter {}",
+                  MESSAGE_SERIALIZER_KEY,
+                  KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
     }
-
-
-
-
-
   }
+
   private void setProducerProps(Context context, String bootStrapServers) {
     kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
     //Defaults overridden based on config
@@ -387,7 +387,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
         writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
       }
       tempOutStream.get().reset();
-      AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
+      AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()),
+                                            ByteBuffer.wrap(event.getBody()));
       encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder);
       writer.get().write(e, encoder);
       encoder.flush();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 6b64bc1..1bf380c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -29,7 +29,8 @@ public class KafkaSinkConstants {
 
   public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
   public static final String BATCH_SIZE = "flumeBatchSize";
-  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String BOOTSTRAP_SERVERS_CONFIG =
+      KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
   public static final String KEY_HEADER = "key";
   public static final String TOPIC_HEADER = "topic";
@@ -37,25 +38,23 @@ public class KafkaSinkConstants {
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;
 
-  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+  public static final String DEFAULT_KEY_SERIALIZER =
+      "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER =
+      "org.apache.kafka.common.serialization.ByteArraySerializer";
 
   public static final int DEFAULT_BATCH_SIZE = 100;
   public static final String DEFAULT_TOPIC = "default-flume-topic";
   public static final String DEFAULT_ACKS = "1";
 
-
   /* Old Properties */
 
-   /* Properties */
+  /* Properties */
 
   public static final String OLD_BATCH_SIZE = "batchSize";
   public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
   public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
   public static final String BROKER_LIST_FLUME_KEY = "brokerList";
   public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
-
-
-
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
index 12bdc40..095f889 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
@@ -84,7 +84,8 @@ public class BlobDeserializer implements EventDeserializer {
       blob.write(buf, 0, n);
       blobLength += n;
       if (blobLength >= maxBlobLength) {
-        LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+        LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!",
+                    maxBlobLength);
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
index e84dec1..ca7614a 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
@@ -87,7 +87,8 @@ public class BlobHandler implements HTTPSourceHandler {
         blob.write(buf, 0, n);
         blobLength += n;
         if (blobLength >= maxBlobLength) {
-          LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);
+          LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!",
+                      maxBlobLength);
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
index d3154af..d877814 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
@@ -97,8 +97,10 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
         .build();
     }
     
-    Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
-    morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);      
+    Config override = ConfigFactory.parseMap(
+        context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
+    morphline = new Compiler().compile(
+        new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
     
     this.mappingTimer = morphlineContext.getMetricRegistry().timer(
         MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
index ef8f716..3b94133 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
@@ -47,12 +47,13 @@ import com.google.common.io.ByteStreams;
 public class MorphlineInterceptor implements Interceptor {
 
   private final Context context;
-  private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>();
+  private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<>();
   
   protected MorphlineInterceptor(Context context) {
     Preconditions.checkNotNull(context);
     this.context = context;
-    returnToPool(new LocalMorphlineInterceptor(context)); // fail fast on morphline compilation exception
+    // fail fast on morphline compilation exception
+    returnToPool(new LocalMorphlineInterceptor(context));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 9c4dc25..f7a73f3 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -160,15 +160,15 @@ public class MorphlineSink extends AbstractSink implements Configurable {
       return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
     } catch (Throwable t) {
       // Ooops - need to rollback and back off
-      LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName()
-            + ". Exception follows.", t);
+      LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " +
+          myChannel.getName() + ". Exception follows.", t);
       try {
         if (!isMorphlineTransactionCommitted) {
           handler.rollbackTransaction();
         }
       } catch (Throwable t2) {
-        LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " +
-        		"Exception follows.", t2);
+        LOGGER.error("Morphline Sink " + getName() +
+            ": Unable to rollback morphline transaction. Exception follows.", t2);
       } finally {
         try {
           txn.rollback();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
index 6b327ce..acb5118 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java
@@ -57,15 +57,17 @@ import org.apache.flume.event.SimpleEvent;
 public class DefaultJMSMessageConverter implements JMSMessageConverter {
 
   private final Charset charset;
+
   private DefaultJMSMessageConverter(String charset) {
     this.charset = Charset.forName(charset);
   }
+
   public static class Builder implements JMSMessageConverter.Builder {
     @Override
     public JMSMessageConverter build(Context context) {
-      return new DefaultJMSMessageConverter(context.
-          getString(JMSSourceConfiguration.CONVERTER_CHARSET,
-              JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim());
+      return new DefaultJMSMessageConverter(context.getString(
+          JMSSourceConfiguration.CONVERTER_CHARSET,
+          JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim());
     }
   }
 
@@ -75,52 +77,52 @@ public class DefaultJMSMessageConverter implements JMSMessageConverter {
     Map<String, String> headers = event.getHeaders();
     @SuppressWarnings("rawtypes")
     Enumeration propertyNames = message.getPropertyNames();
-    while(propertyNames.hasMoreElements()) {
+    while (propertyNames.hasMoreElements()) {
       String name = propertyNames.nextElement().toString();
       String value = message.getStringProperty(name);
       headers.put(name, value);
     }
-    if(message instanceof BytesMessage) {
+    if (message instanceof BytesMessage) {
       BytesMessage bytesMessage = (BytesMessage)message;
       long length = bytesMessage.getBodyLength();
-      if(length > 0L) {
+      if (length > 0L) {
         if (length > Integer.MAX_VALUE) {
           throw new JMSException("Unable to process message " + "of size "
               + length);
         }
         byte[] body = new byte[(int)length];
         int count = bytesMessage.readBytes(body);
-        if(count != length) {
+        if (count != length) {
           throw new JMSException("Unable to read full message. " +
               "Read " + count + " of total " + length);
         }
         event.setBody(body);
       }
-    } else if(message instanceof TextMessage) {
+    } else if (message instanceof TextMessage) {
       TextMessage textMessage = (TextMessage)message;
       event.setBody(textMessage.getText().getBytes(charset));
-    } else if(message instanceof ObjectMessage) {
+    } else if (message instanceof ObjectMessage) {
       ObjectMessage objectMessage = (ObjectMessage)message;
       Object object = objectMessage.getObject();
-      if(object != null) {
+      if (object != null) {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutput out = null;
         try {
           out = new ObjectOutputStream(bos);
           out.writeObject(object);
           event.setBody(bos.toByteArray());
-        } catch(IOException e) {
+        } catch (IOException e) {
           throw new FlumeException("Error serializing object", e);
         } finally {
           try {
-            if(out != null) {
+            if (out != null) {
               out.close();
             }
           } catch (IOException e) {
             throw new FlumeException("Error closing ObjectOutputStream", e);
           }
           try {
-            if(bos != null) {
+            if (bos != null) {
               bos.close();
             }
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
index 2f0220a..8874dd1 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java
@@ -22,7 +22,6 @@ import java.util.Properties;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
-
 public class InitialContextFactory {
 
   public InitialContext create(Properties properties) throws NamingException {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
index 7a9461b..6b3a1cf 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
@@ -18,8 +18,12 @@
  */
 package org.apache.flume.source.jms;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -30,14 +34,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
 
 class JMSMessageConsumer {
   private static final Logger logger = LoggerFactory
@@ -52,11 +50,11 @@ class JMSMessageConsumer {
   private final Destination destination;
   private final MessageConsumer messageConsumer;
 
-  JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory, String destinationName,
-    JMSDestinationLocator destinationLocator, JMSDestinationType destinationType,
-    String messageSelector, int batchSize, long pollTimeout,
-    JMSMessageConverter messageConverter,
-    Optional<String> userName, Optional<String> password) {
+  JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory,
+                     String destinationName, JMSDestinationLocator destinationLocator,
+                     JMSDestinationType destinationType, String messageSelector, int batchSize,
+                     long pollTimeout, JMSMessageConverter messageConverter,
+                     Optional<String> userName, Optional<String> password) {
     this.batchSize = batchSize;
     this.pollTimeout = pollTimeout;
     this.messageConverter = messageConverter;
@@ -65,7 +63,7 @@ class JMSMessageConsumer {
     Preconditions.checkArgument(pollTimeout >= 0, "Poll timeout cannot be " +
         "negative");
     try {
-      if(userName.isPresent()) {
+      if (userName.isPresent()) {
         connection = connectionFactory.createConnection(userName.get(),
             password.get());
       } else {
@@ -82,37 +80,37 @@ class JMSMessageConsumer {
       throw new FlumeException("Could not create session", e);
     }
 
-  try {
-    if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
-      switch (destinationType) {
-        case QUEUE:
-          destination = session.createQueue(destinationName);
-          break;
-        case TOPIC:
-          destination = session.createTopic(destinationName);
-          break;
-        default:
-          throw new IllegalStateException(String.valueOf(destinationType));
+    try {
+      if (destinationLocator.equals(JMSDestinationLocator.CDI)) {
+        switch (destinationType) {
+          case QUEUE:
+            destination = session.createQueue(destinationName);
+            break;
+          case TOPIC:
+            destination = session.createTopic(destinationName);
+            break;
+          default:
+            throw new IllegalStateException(String.valueOf(destinationType));
+        }
+      } else {
+        destination = (Destination) initialContext.lookup(destinationName);
       }
-    } else {
-      destination = (Destination) initialContext.lookup(destinationName);
+    } catch (JMSException e) {
+      throw new FlumeException("Could not create destination " + destinationName, e);
+    } catch (NamingException e) {
+      throw new FlumeException("Could not find destination " + destinationName, e);
     }
-  } catch (JMSException e) {
-    throw new FlumeException("Could not create destination " + destinationName, e);
-  } catch (NamingException e) {
-    throw new FlumeException("Could not find destination " + destinationName, e);
-  }
 
-  try {
+    try {
       messageConsumer = session.createConsumer(destination,
-          messageSelector.isEmpty() ? null: messageSelector);
+          messageSelector.isEmpty() ? null : messageSelector);
     } catch (JMSException e) {
       throw new FlumeException("Could not create consumer", e);
     }
     String startupMsg = String.format("Connected to '%s' of type '%s' with " +
-        "user '%s', batch size '%d', selector '%s' ", destinationName,
+            "user '%s', batch size '%d', selector '%s' ", destinationName,
         destinationType, userName.isPresent() ? userName.get() : "null",
-            batchSize, messageSelector.isEmpty() ? null : messageSelector);
+        batchSize, messageSelector.isEmpty() ? null : messageSelector);
     logger.info(startupMsg);
   }
 
@@ -120,23 +118,23 @@ class JMSMessageConsumer {
     List<Event> result = new ArrayList<Event>(batchSize);
     Message message;
     message = messageConsumer.receive(pollTimeout);
-    if(message != null) {
+    if (message != null) {
       result.addAll(messageConverter.convert(message));
       int max = batchSize - 1;
       for (int i = 0; i < max; i++) {
         message = messageConsumer.receiveNoWait();
-        if(message == null) {
+        if (message == null) {
           break;
         }
         result.addAll(messageConverter.convert(message));
       }
     }
-    if(logger.isDebugEnabled()) {
-      logger.debug(String.format("Took batch of %s from %s", result.size(),
-          destination));
+    if (logger.isDebugEnabled()) {
+      logger.debug(String.format("Took batch of %s from %s", result.size(), destination));
     }
     return result;
   }
+
   void commit() {
     try {
       session.commit();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
index af74bf4..9747a31 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java
@@ -22,15 +22,15 @@ import javax.naming.InitialContext;
 
 import com.google.common.base.Optional;
 
-
 public class JMSMessageConsumerFactory {
 
   JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory connectionFactory,
-    String destinationName, JMSDestinationType destinationType, JMSDestinationLocator destinationLocator,
-    String messageSelector, int batchSize, long pollTimeout, JMSMessageConverter messageConverter,
-    Optional<String> userName, Optional<String> password) {
+      String destinationName, JMSDestinationType destinationType,
+      JMSDestinationLocator destinationLocator, String messageSelector, int batchSize,
+      long pollTimeout, JMSMessageConverter messageConverter,
+      Optional<String> userName, Optional<String> password) {
     return new JMSMessageConsumer(initialContext, connectionFactory, destinationName,
-      destinationLocator, destinationType, messageSelector, batchSize, pollTimeout,
+        destinationLocator, destinationType, messageSelector, batchSize, pollTimeout,
         messageConverter, userName, password);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index c1cc9cf..7631827 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -76,40 +76,39 @@ public class JMSSource extends AbstractPollableSource {
   private int jmsExceptionCounter;
   private InitialContext initialContext;
 
-
   public JMSSource() {
     this(new JMSMessageConsumerFactory(), new InitialContextFactory());
   }
+
   @VisibleForTesting
-  public JMSSource(JMSMessageConsumerFactory consumerFactory, InitialContextFactory initialContextFactory) {
+  public JMSSource(JMSMessageConsumerFactory consumerFactory,
+                   InitialContextFactory initialContextFactory) {
     super();
     this.consumerFactory = consumerFactory;
     this.initialContextFactory = initialContextFactory;
-
   }
 
   @Override
   protected void doConfigure(Context context) throws FlumeException {
     sourceCounter = new SourceCounter(getName());
 
-    initialContextFactoryName = context.getString(JMSSourceConfiguration.
-        INITIAL_CONTEXT_FACTORY, "").trim();
+    initialContextFactoryName = context.getString(
+        JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim();
 
-    providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "")
-        .trim();
+    providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim();
 
-    destinationName = context.getString(JMSSourceConfiguration.
-        DESTINATION_NAME, "").trim();
+    destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim();
 
-    String destinationTypeName = context.getString(JMSSourceConfiguration.
-        DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH);
+    String destinationTypeName = context.getString(
+        JMSSourceConfiguration.DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH);
 
-    String destinationLocatorName = context.getString(JMSSourceConfiguration.
-        DESTINATION_LOCATOR, JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT)
-      .trim().toUpperCase(Locale.ENGLISH);
+    String destinationLocatorName = context.getString(
+        JMSSourceConfiguration.DESTINATION_LOCATOR,
+        JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT)
+        .trim().toUpperCase(Locale.ENGLISH);
 
-    messageSelector = context.getString(JMSSourceConfiguration.
-        MESSAGE_SELECTOR, "").trim();
+    messageSelector = context.getString(
+        JMSSourceConfiguration.MESSAGE_SELECTOR, "").trim();
 
     batchSize = context.getInteger(JMSSourceConfiguration.BATCH_SIZE,
         JMSSourceConfiguration.BATCH_SIZE_DEFAULT);
@@ -117,16 +116,14 @@ public class JMSSource extends AbstractPollableSource {
     errorThreshold = context.getInteger(JMSSourceConfiguration.ERROR_THRESHOLD,
         JMSSourceConfiguration.ERROR_THRESHOLD_DEFAULT);
 
-    userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.
-        USERNAME));
+    userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.USERNAME));
 
     pollTimeout = context.getLong(JMSSourceConfiguration.POLL_TIMEOUT,
         JMSSourceConfiguration.POLL_TIMEOUT_DEFAULT);
 
-    String passwordFile = context.getString(JMSSourceConfiguration.
-        PASSWORD_FILE, "").trim();
+    String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim();
 
-    if(passwordFile.isEmpty()) {
+    if (passwordFile.isEmpty()) {
       password = Optional.of("");
     } else {
       try {
@@ -140,45 +137,38 @@ public class JMSSource extends AbstractPollableSource {
 
     String converterClassName = context.getString(
         JMSSourceConfiguration.CONVERTER_TYPE,
-        JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT)
-        .trim();
-    if(JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.
-        equalsIgnoreCase(converterClassName)) {
+        JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT).trim();
+    if (JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.equalsIgnoreCase(converterClassName)) {
       converterClassName = DefaultJMSMessageConverter.Builder.class.getName();
     }
-    Context converterContext = new Context(context.
-        getSubProperties(JMSSourceConfiguration.CONVERTER + "."));
+    Context converterContext = new Context(context.getSubProperties(
+        JMSSourceConfiguration.CONVERTER + "."));
     try {
       @SuppressWarnings("rawtypes")
       Class clazz = Class.forName(converterClassName);
       boolean isBuilder = JMSMessageConverter.Builder.class
           .isAssignableFrom(clazz);
-      if(isBuilder) {
-        JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder)
-            clazz.newInstance();
+      if (isBuilder) {
+        JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder)clazz.newInstance();
         converter = builder.build(converterContext);
       } else {
-        Preconditions.checkState(JMSMessageConverter.class.
-            isAssignableFrom(clazz), String.
-            format("Class %s is not a subclass of JMSMessageConverter",
-                clazz.getName()));
+        Preconditions.checkState(JMSMessageConverter.class.isAssignableFrom(clazz),
+            String.format("Class %s is not a subclass of JMSMessageConverter", clazz.getName()));
         converter = (JMSMessageConverter)clazz.newInstance();
-        boolean configured = Configurables.configure(converter,
-            converterContext);
-        if(logger.isDebugEnabled()) {
-          logger.debug(String.
-              format("Attempted configuration of %s, result = %s",
-                  converterClassName, String.valueOf(configured)));
+        boolean configured = Configurables.configure(converter, converterContext);
+        if (logger.isDebugEnabled()) {
+          logger.debug(String.format("Attempted configuration of %s, result = %s",
+                                     converterClassName, String.valueOf(configured)));
         }
       }
-    } catch(Exception e) {
+    } catch (Exception e) {
       throw new FlumeException(String.format(
           "Unable to create instance of converter %s", converterClassName), e);
     }
 
-    String connectionFactoryName = context.getString(JMSSourceConfiguration.
-        CONNECTION_FACTORY, JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT)
-        .trim();
+    String connectionFactoryName = context.getString(
+        JMSSourceConfiguration.CONNECTION_FACTORY,
+        JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim();
 
     assertNotEmpty(initialContextFactoryName, String.format(
         "Initial Context Factory is empty. This is specified by %s",
@@ -210,8 +200,7 @@ public class JMSSource extends AbstractPollableSource {
           "invalid.", destinationLocatorName), e);
     }
 
-    Preconditions.checkArgument(batchSize > 0, "Batch size must be greater " +
-        "than 0");
+    Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than 0");
 
     try {
       Properties contextProperties = new Properties();
@@ -223,12 +212,12 @@ public class JMSSource extends AbstractPollableSource {
 
       // Provide properties for connecting via JNDI
       if (this.userName.isPresent()) {
-        contextProperties.setProperty(
-    	    javax.naming.Context.SECURITY_PRINCIPAL, this.userName.get());
+        contextProperties.setProperty(javax.naming.Context.SECURITY_PRINCIPAL,
+                                      this.userName.get());
       }
       if (this.password.isPresent()) {
-        contextProperties.setProperty(
-    	    javax.naming.Context.SECURITY_CREDENTIALS, this.password.get());
+        contextProperties.setProperty(javax.naming.Context.SECURITY_CREDENTIALS,
+                                      this.password.get());
       }
 
       initialContext = initialContextFactory.create(contextProperties);
@@ -239,28 +228,26 @@ public class JMSSource extends AbstractPollableSource {
     }
 
     try {
-      connectionFactory = (ConnectionFactory) initialContext.
-          lookup(connectionFactoryName);
+      connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName);
     } catch (NamingException e) {
       throw new FlumeException("Could not lookup ConnectionFactory", e);
     }
   }
 
   private void assertNotEmpty(String arg, String msg) {
-    Preconditions.checkArgument(!arg.isEmpty(),
-        msg);
+    Preconditions.checkArgument(!arg.isEmpty(), msg);
   }
 
   @Override
   protected synchronized Status doProcess() throws EventDeliveryException {
     boolean error = true;
     try {
-      if(consumer == null) {
+      if (consumer == null) {
         consumer = createConsumer();
       }
       List<Event> events = consumer.take();
       int size = events.size();
-      if(size == 0) {
+      if (size == 0) {
         error = false;
         return Status.BACKOFF;
       }
@@ -275,28 +262,28 @@ public class JMSSource extends AbstractPollableSource {
       logger.warn("Error appending event to channel. "
           + "Channel might be full. Consider increasing the channel "
           + "capacity or make sure the sinks perform faster.", channelException);
-    } catch(JMSException jmsException) {
+    } catch (JMSException jmsException) {
       logger.warn("JMSException consuming events", jmsException);
-      if(++jmsExceptionCounter > errorThreshold) {
-        if(consumer != null) {
+      if (++jmsExceptionCounter > errorThreshold) {
+        if (consumer != null) {
           logger.warn("Exceeded JMSException threshold, closing consumer");
           consumer.rollback();
           consumer.close();
           consumer = null;
         }
       }
-    } catch(Throwable throwable) {
+    } catch (Throwable throwable) {
       logger.error("Unexpected error processing events", throwable);
-      if(throwable instanceof Error) {
+      if (throwable instanceof Error) {
         throw (Error) throwable;
       }
     } finally {
-      if(error) {
-        if(consumer != null) {
+      if (error) {
+        if (consumer != null) {
           consumer.rollback();
         }
       } else {
-        if(consumer != null) {
+        if (consumer != null) {
           consumer.commit();
           jmsExceptionCounter = 0;
         }
@@ -304,6 +291,7 @@ public class JMSSource extends AbstractPollableSource {
     }
     return Status.BACKOFF;
   }
+
   @Override
   protected synchronized void doStart() {
     try {
@@ -317,18 +305,18 @@ public class JMSSource extends AbstractPollableSource {
 
   @Override
   protected synchronized void doStop() {
-    if(consumer != null) {
+    if (consumer != null) {
       consumer.close();
       consumer = null;
     }
     sourceCounter.stop();
   }
+
   private JMSMessageConsumer createConsumer() throws JMSException {
     logger.info("Creating new consumer for " + destinationName);
     JMSMessageConsumer consumer = consumerFactory.create(initialContext,
-    connectionFactory, destinationName, destinationType, destinationLocator,
-    messageSelector, batchSize,
-        pollTimeout, converter, userName, password);
+        connectionFactory, destinationName, destinationType, destinationLocator,
+        messageSelector, batchSize, pollTimeout, converter, userName, password);
     jmsExceptionCounter = 0;
     return consumer;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 84fef52..90e4715 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -112,18 +112,24 @@ public class KafkaSource extends AbstractPollableSource
    */
   public abstract class Subscriber<T> {
     public abstract void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener);
-    public T get() {return null;}
+
+    public T get() {
+      return null;
+    }
   }
 
   private class TopicListSubscriber extends Subscriber<List<String>> {
     private List<String> topicList;
+
     public TopicListSubscriber(String commaSeparatedTopics) {
       this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$"));
     }
+
     @Override
     public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
       consumer.subscribe(topicList, listener);
     }
+
     @Override
     public List<String> get() {
       return topicList;
@@ -132,13 +138,16 @@ public class KafkaSource extends AbstractPollableSource
 
   private class PatternSubscriber extends Subscriber<Pattern> {
     private Pattern pattern;
+
     public PatternSubscriber(String regex) {
       this.pattern = Pattern.compile(regex);
     }
+
     @Override
     public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
       consumer.subscribe(pattern, listener);
     }
+
     @Override
     public Pattern get() {
       return pattern;
@@ -232,10 +241,11 @@ public class KafkaSource extends AbstractPollableSource
         }
 
         if (log.isDebugEnabled()) {
-          log.debug("Topic: {} Partition: {} Message: {}", new String[]{
-                  message.topic(),
-                  String.valueOf(message.partition()),
-                  new String(eventBody)});
+          log.debug("Topic: {} Partition: {} Message: {}", new String[] {
+              message.topic(),
+              String.valueOf(message.partition()),
+              new String(eventBody)
+          });
         }
 
         event = EventBuilder.withBody(eventBody, headers);
@@ -305,21 +315,21 @@ public class KafkaSource extends AbstractPollableSource
     if (topicProperty != null && !topicProperty.isEmpty()) {
       // create subscriber that uses pattern-based subscription
       subscriber = new PatternSubscriber(topicProperty);
-    } else
-    if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) {
+    } else if ((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null &&
+               !topicProperty.isEmpty()) {
       // create subscriber that uses topic list subscription
       subscriber = new TopicListSubscriber(topicProperty);
-    } else
-    if (subscriber == null) {
+    } else if (subscriber == null) {
       throw new ConfigurationException("At least one Kafka topic must be specified.");
     }
 
     batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
-            KafkaSourceConstants.DEFAULT_BATCH_SIZE);
+                                         KafkaSourceConstants.DEFAULT_BATCH_SIZE);
     maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
-            KafkaSourceConstants.DEFAULT_BATCH_DURATION);
+                                                KafkaSourceConstants.DEFAULT_BATCH_DURATION);
 
-    useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT);
+    useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT,
+                                            KafkaSourceConstants.DEFAULT_AVRO_EVENT);
 
     if (log.isDebugEnabled()) {
       log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
@@ -337,7 +347,6 @@ public class KafkaSource extends AbstractPollableSource
     }
   }
 
-
   // We can remove this once the properties are officially deprecated
   private void translateOldProperties(Context ctx) {
     // topic
@@ -358,16 +367,18 @@ public class KafkaSource extends AbstractPollableSource
     }
   }
 
-
   private void setConsumerProps(Context ctx, String bootStrapServers) {
-    String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    String groupId = ctx.getString(
+        KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
     if ((groupId == null || groupId.isEmpty()) &&
-            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-        groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
-        log.info("Group ID was not specified. Using " + groupId + " as the group id.");
+        kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+      groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
+      log.info("Group ID was not specified. Using " + groupId + " as the group id.");
     }
-    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
-    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
+    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                   KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
+    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                   KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
     //Defaults overridden based on config
     kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
     //These always take precedence over config
@@ -375,7 +386,8 @@ public class KafkaSource extends AbstractPollableSource
     if (groupId != null) {
       kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     }
-    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                   KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
 
     log.info(kafkaProps.toString());
   }
@@ -426,7 +438,6 @@ public class KafkaSource extends AbstractPollableSource
   }
 }
 
-
 class SourceRebalanceListener implements ConsumerRebalanceListener {
   private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class);
   private AtomicBoolean rebalanceFlag;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 9f20f61..1f255f9 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -22,9 +22,12 @@ public class KafkaSourceConstants {
 
   public static final String KAFKA_PREFIX = "kafka.";
   public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
-  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-  public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
-  public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String DEFAULT_KEY_DESERIALIZER =
+      "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIALIZER =
+      "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String BOOTSTRAP_SERVERS =
+      KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
   public static final String TOPICS = KAFKA_PREFIX + "topics";
   public static final String TOPICS_REGEX = TOPICS + "." + "regex";
   public static final String DEFAULT_AUTO_COMMIT =  "false";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 8128df4..1409f25 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -19,6 +19,20 @@
 
 package org.apache.flume.source.taildir;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.google.gson.stream.JsonReader;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.client.avro.ReliableEventReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
@@ -29,21 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.client.avro.ReliableEventReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Table;
-import com.google.gson.stream.JsonReader;
-
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ReliableTaildirEventReader implements ReliableEventReader {
@@ -111,15 +110,15 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
         jr.beginObject();
         while (jr.hasNext()) {
           switch (jr.nextName()) {
-          case "inode":
-            inode = jr.nextLong();
-            break;
-          case "pos":
-            pos = jr.nextLong();
-            break;
-          case "file":
-            path = jr.nextString();
-            break;
+            case "inode":
+              inode = jr.nextLong();
+              break;
+            case "pos":
+              pos = jr.nextLong();
+              break;
+            case "file":
+              path = jr.nextString();
+              break;
           }
         }
         jr.endObject();
@@ -238,7 +237,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
         if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
           long startPos = skipToEnd ? f.length() : 0;
           tf = openFile(f, headers, inode, startPos);
-        } else{
+        } else {
           boolean updated = tf.getLastUpdated() < f.lastModified();
           if (updated) {
             if (tf.getRaf() == null) {
@@ -320,7 +319,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
     }
 
     public ReliableTaildirEventReader build() throws IOException {
-      return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching);
+      return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
+                                            addByteOffset, cachePatternMatching);
     }
   }