You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:34 UTC

[08/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
index 2c71bc9..2c516f9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java
@@ -27,44 +27,45 @@ public abstract class CipherProvider {
   public abstract Encryptor.Builder<?> newEncryptorBuilder();
   public abstract Decryptor.Builder<?> newDecryptorBuilder();
 
-  public static abstract class Encryptor {
+  public abstract static class Encryptor {
 
     public abstract byte[] encrypt(byte[] clearText);
     public abstract byte[] getParameters();
     public abstract String getCodec();
 
     /** Builder implementations MUST have a no-arg constructor */
-    public static abstract class Builder<T extends Encryptor> {
+    public abstract static class Builder<T extends Encryptor> {
       protected Key key;
+
       public Builder<T> setKey(Key key) {
         this.key = Preconditions.checkNotNull(key, "key cannot be null");
         return this;
       }
+
       public abstract T build();
     }
-
   }
 
-  public static abstract class Decryptor {
+  public abstract static class Decryptor {
     public abstract byte[] decrypt(byte[] cipherText);
     public abstract String getCodec();
 
     /** Builder implementations MUST have a no-arg constructor */
-    public static abstract class Builder<T extends Decryptor> {
+    public abstract static class Builder<T extends Decryptor> {
       protected byte[] parameters;
       protected Key key;
+
       public Builder<T> setKey(Key key) {
         this.key = Preconditions.checkNotNull(key, "key cannot be null");
         return this;
       }
+
       public Builder<T> setParameters(byte[] parameters) {
         this.parameters = parameters;
         return this;
       }
+
       public abstract T build();
     }
-
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
index ca11f6b..85b0fbb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java
@@ -33,7 +33,7 @@ public class CipherProviderFactory {
 
   public static CipherProvider.Encryptor getEncrypter(String cipherProviderType,
       Key key) {
-    if(cipherProviderType == null) {
+    if (cipherProviderType == null) {
       return null;
     }
     CipherProvider provider = getProvider(cipherProviderType);
@@ -41,7 +41,7 @@ public class CipherProviderFactory {
   }
   public static CipherProvider.Decryptor getDecrypter(String cipherProviderType,
       Key key, byte[] parameters) {
-    if(cipherProviderType == null) {
+    if (cipherProviderType == null) {
       return null;
     }
     CipherProvider provider = getProvider(cipherProviderType);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
index 87834d7..73bc720 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel.file.encryption;
 
-
 public enum CipherProviderType {
   AESCTRNOPADDING(AESCTRNoPaddingProvider.class),
   OTHER(null);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
index 0155c39..beffd9e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
@@ -27,7 +27,6 @@ import org.apache.flume.FlumeException;
 public class DecryptionFailureException extends FlumeException {
   private static final long serialVersionUID = 6646810195384793646L;
 
-
   public DecryptionFailureException(String msg) {
     super(msg);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
index f961ef9..c96cf0a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java
@@ -55,7 +55,7 @@ public class JCEFileKeyProvider extends KeyProvider {
       keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8)
           .trim().toCharArray();
       ks.load(new FileInputStream(keyStoreFile), keyStorePassword);
-    } catch(Exception ex) {
+    } catch (Exception ex) {
       throw Throwables.propagate(ex);
     }
   }
@@ -65,14 +65,14 @@ public class JCEFileKeyProvider extends KeyProvider {
     String passwordFile = keyStorePasswordFile.getAbsolutePath();
     try {
       char[] keyPassword = keyStorePassword;
-      if(aliasPasswordFileMap.containsKey(alias)) {
+      if (aliasPasswordFileMap.containsKey(alias)) {
         File keyPasswordFile = aliasPasswordFileMap.get(alias);
         keyPassword = Files.toString(keyPasswordFile,
             Charsets.UTF_8).trim().toCharArray();
         passwordFile = keyPasswordFile.getAbsolutePath();
       }
       Key key = ks.getKey(alias, keyPassword);
-      if(key == null) {
+      if (key == null) {
         throw new IllegalStateException("KeyStore returned null for " + alias);
       }
       return key;
@@ -99,13 +99,13 @@ public class JCEFileKeyProvider extends KeyProvider {
           EncryptionConfiguration.JCE_FILE_KEYS);
       Preconditions.checkState(!Strings.isNullOrEmpty(passwordProtectedKeys),
           "Keys available to KeyStore was not specified or empty");
-      for(String passwordName : passwordProtectedKeys.trim().split("\\s+")) {
+      for (String passwordName : passwordProtectedKeys.trim().split("\\s+")) {
         String propertyName = Joiner.on(".").join(EncryptionConfiguration.JCE_FILE_KEYS,
             passwordName, EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
         String passwordFileName = context.getString(propertyName,
             keyStorePasswordFileName);
         File passwordFile = new File(passwordFileName.trim());
-        if(passwordFile.isFile()) {
+        if (passwordFile.isFile()) {
           aliasPasswordFileMap.put(passwordName, passwordFile);
         } else {
           logger.warn("Password file for alias " + passwordName +

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
index 0fef6dc..3263615 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel.file.encryption;
 
-
 public enum KeyProviderType {
   JCEKSFILE(JCEFileKeyProvider.Builder.class),
   OTHER(null);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index 34f93d9..50492cc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -5010,18 +5010,21 @@ public final class ProtosFactory {
   public interface RollbackOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
   }
+
   /**
    * Protobuf type {@code Rollback}
    */
-  public static final class Rollback extends
-      com.google.protobuf.GeneratedMessage
-      implements RollbackOrBuilder {
+  public static final class Rollback extends com.google.protobuf.GeneratedMessage
+                                     implements RollbackOrBuilder {
     // Use Rollback.newBuilder() to construct.
     private Rollback(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
       super(builder);
       this.unknownFields = builder.getUnknownFields();
     }
-    private Rollback(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private Rollback(boolean noInit) {
+      this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance();
+    }
 
     private static final Rollback defaultInstance;
     public static Rollback getDefaultInstance() {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
index 3b97684..7138b41 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java
@@ -89,7 +89,6 @@ public final class ConfigurationConstants {
   public static final String OLD_CONFIG_CREATE_SCHEMA =
       PREFIX + CONFIG_CREATE_SCHEMA;
 
-
   public static final String CONFIG_CREATE_INDEX = "create.index";
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
index 1192452..fba6e7b 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
@@ -27,6 +27,7 @@ import org.apache.flume.annotations.Disposable;
 import org.apache.flume.channel.AbstractChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 /**
  * <p>
  * A JDBC based channel implementation.

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
index e445d61..76bc627 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java
@@ -29,7 +29,7 @@ public interface JdbcChannelProvider {
   /**
    * Initializes the channel provider. This method must be called before
    * the channel can be used in any way.
-   * @param properties the configuration for the system
+   * @param context the configuration for the system
    */
   public void initialize(Context context);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
index 2dc3fcc..56eebfd 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java
@@ -127,7 +127,6 @@ public class DerbySchemaHandler implements SchemaHandler {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DerbySchemaHandler.class);
 
-
   private static final String QUREY_SYSCHEMA_FLUME
       = "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'";
 
@@ -613,15 +612,15 @@ public class DerbySchemaHandler implements SchemaHandler {
 
       // Persist the payload spill
       if (hasSpillPayload) {
-         spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
-         spillEventStmt.setLong(1, eventId);
-         spillEventStmt.setBinaryStream(2,
-             new ByteArrayInputStream(spillPayload), spillPayload.length);
-         int spillEventCount = spillEventStmt.executeUpdate();
-         if (spillEventCount != 1) {
-           throw new JdbcChannelException("Invalid update count on spill "
-               + "event insert: " + spillEventCount);
-         }
+        spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
+        spillEventStmt.setLong(1, eventId);
+        spillEventStmt.setBinaryStream(2,
+            new ByteArrayInputStream(spillPayload), spillPayload.length);
+        int spillEventCount = spillEventStmt.executeUpdate();
+        if (spillEventCount != 1) {
+          throw new JdbcChannelException("Invalid update count on spill "
+              + "event insert: " + spillEventCount);
+        }
       }
 
       // Persist the headers
@@ -645,8 +644,7 @@ public class DerbySchemaHandler implements SchemaHandler {
 
           int updateCount = baseHeaderStmt.executeUpdate();
           if (updateCount != 1) {
-             throw new JdbcChannelException("Unexpected update header count: "
-                 + updateCount);
+            throw new JdbcChannelException("Unexpected update header count: " + updateCount);
           }
           ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
           if (!headerIdResultSet.next()) {
@@ -705,7 +703,7 @@ public class DerbySchemaHandler implements SchemaHandler {
           headerValueSpillStmt =
               connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
 
-          for(HeaderEntry entry : headerWithValueSpill) {
+          for (HeaderEntry entry : headerWithValueSpill) {
             String valueSpill = entry.getValue().getSpill();
 
             headerValueSpillStmt.setLong(1, entry.getId());

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
index f42b4dd..845b794 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
@@ -54,12 +54,10 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(JdbcChannelProviderImpl.class);
 
-
   private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME
-        = "org.apache.derby.jdbc.EmbeddedDriver";
+      = "org.apache.derby.jdbc.EmbeddedDriver";
 
-  private static final String DEFAULT_DRIVER_CLASSNAME
-         = EMBEDDED_DERBY_DRIVER_CLASSNAME;
+  private static final String DEFAULT_DRIVER_CLASSNAME = EMBEDDED_DERBY_DRIVER_CLASSNAME;
   private static final String DEFAULT_USERNAME = "sa";
   private static final String DEFAULT_PASSWORD = "";
   private static final String DEFAULT_DBTYPE = "DERBY";
@@ -133,7 +131,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
 
     for (String key: sysProps.keySet()) {
       String value = sysProps.get(key);
-      if(key != null && value != null) {
+      if (key != null && value != null) {
         System.setProperty(key, value);
       }
     }
@@ -254,7 +252,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
         int index = connectUrl.indexOf(";");
         String baseUrl = null;
         if (index != -1) {
-          baseUrl = connectUrl.substring(0, index+1);
+          baseUrl = connectUrl.substring(0, index + 1);
         } else {
           baseUrl = connectUrl + ";";
         }
@@ -440,12 +438,12 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider {
     databaseType = DatabaseType.getByName(dbTypeName);
 
     switch (databaseType) {
-    case DERBY:
-    case MYSQL:
-      break;
-    default:
-      throw new JdbcChannelException("Database " + databaseType
-          + " not supported at this time");
+      case DERBY:
+      case MYSQL:
+        break;
+      default:
+        throw new JdbcChannelException("Database " + databaseType
+            + " not supported at this time");
     }
 
     // Register driver

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
index 13b14f5..6f3aecd 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
@@ -28,7 +28,6 @@ import org.apache.flume.channel.jdbc.JdbcChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class JdbcTransactionImpl implements Transaction {
 
   private static final Logger LOGGER =

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
index dba96fc..9bfc227 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java
@@ -26,14 +26,12 @@ import java.sql.Connection;
 public interface SchemaHandler {
 
   /**
-   * @param connection the connection to check for schema.
    * @return true if the schema exists. False otherwise.
    */
   public boolean schemaExists();
 
   /**
    * Validates the schema.
-   * @param connection
    */
   public void validateSchema();
 
@@ -74,8 +72,6 @@ public interface SchemaHandler {
    * must have an active transaction ongoing. This allows the provider impl to
    * enforce channel capacity limits when persisting events.
    * @return the current size of the channel.
-   * @param connection
-   * @return
    */
   public long getChannelSize(Connection connection);
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
index 2543848..35f4c61 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java
@@ -27,19 +27,17 @@ import org.apache.flume.channel.jdbc.JdbcChannelException;
  */
 public final class SchemaHandlerFactory {
 
-  public static SchemaHandler getHandler(DatabaseType dbType,
-      DataSource dataSource) {
+  public static SchemaHandler getHandler(DatabaseType dbType, DataSource dataSource) {
     SchemaHandler handler = null;
-    switch(dbType) {
-    case DERBY:
-      handler = new DerbySchemaHandler(dataSource);
-      break;
-    case MYSQL:
-      handler = new MySQLSchemaHandler(dataSource);
-      break;
-    default:
-      throw new JdbcChannelException("Database " + dbType
-          + " not supported yet");
+    switch (dbType) {
+      case DERBY:
+        handler = new DerbySchemaHandler(dataSource);
+        break;
+      case MYSQL:
+        handler = new MySQLSchemaHandler(dataSource);
+        break;
+      default:
+        throw new JdbcChannelException("Database " + dbType + " not supported yet");
     }
 
     return handler;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index dfc95bc..90e3288 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -73,7 +73,7 @@ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
 
 public class KafkaChannel extends BasicChannelSemantics {
 
-  private final static Logger logger =
+  private static final Logger logger =
           LoggerFactory.getLogger(KafkaChannel.class);
 
   private final Properties consumerProps = new Properties();
@@ -97,27 +97,27 @@ public class KafkaChannel extends BasicChannelSemantics {
 
   private KafkaChannelCounter counter;
 
-   /* Each Consumer commit will commit all partitions owned by it. To
-   * ensure that each partition is only committed when all events are
-   * actually done, we will need to keep a Consumer per thread.
-   */
+  /* Each Consumer commit will commit all partitions owned by it. To
+  * ensure that each partition is only committed when all events are
+  * actually done, we will need to keep a Consumer per thread.
+  */
 
-  private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new
-          ThreadLocal<ConsumerAndRecords>() {
-            @Override
-            public ConsumerAndRecords initialValue() {
-              return createConsumerAndRecords();
-            }
-          };
+  private final ThreadLocal<ConsumerAndRecords> consumerAndRecords =
+      new ThreadLocal<ConsumerAndRecords>() {
+        @Override
+        public ConsumerAndRecords initialValue() {
+          return createConsumerAndRecords();
+        }
+      };
 
   @Override
   public void start() {
-      logger.info("Starting Kafka Channel: {}", getName());
-      producer = new KafkaProducer<String, byte[]>(producerProps);
-      // We always have just one topic being read by one thread
-      logger.info("Topic = {}", topic.get());
-      counter.start();
-      super.start();
+    logger.info("Starting Kafka Channel: {}", getName());
+    producer = new KafkaProducer<String, byte[]>(producerProps);
+    // We always have just one topic being read by one thread
+    logger.info("Topic = {}", topic.get());
+    counter.start();
+    super.start();
   }
 
   @Override
@@ -185,17 +185,19 @@ public class KafkaChannel extends BasicChannelSemantics {
         throw new ConfigurationException("Bootstrap Servers must be specified");
       } else {
         ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
+        logger.warn("{} is deprecated. Please use the parameter {}",
+                    BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
       }
     }
 
     //GroupId
     // If there is an old Group Id set, then use that if no groupId is set.
     if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
-    String oldGroupId = ctx.getString(GROUP_ID_FLUME);
-      if ( oldGroupId != null  && !oldGroupId.isEmpty()) {
+      String oldGroupId = ctx.getString(GROUP_ID_FLUME);
+      if (oldGroupId != null  && !oldGroupId.isEmpty()) {
         ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
-        logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+        logger.warn("{} is deprecated. Please use the parameter {}",
+                    GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
       }
     }
 
@@ -209,7 +211,9 @@ public class KafkaChannel extends BasicChannelSemantics {
           auto = "latest";
         }
         ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
-        logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        logger.warn("{} is deprecated. Please use the parameter {}",
+                    READ_SMALLEST_OFFSET,
+                    KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
       }
 
     }
@@ -249,15 +253,17 @@ public class KafkaChannel extends BasicChannelSemantics {
     logger.info(consumerProps.toString());
   }
 
-  protected Properties getConsumerProps() { return consumerProps; }
-
+  protected Properties getConsumerProps() {
+    return consumerProps;
+  }
 
   private synchronized ConsumerAndRecords createConsumerAndRecords() {
     try {
       KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
       ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
       logger.info("Created new consumer to connect to Kafka");
-      car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag));
+      car.consumer.subscribe(Arrays.asList(topic.get()),
+                             new ChannelRebalanceListener(rebalanceFlag));
       car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
       consumers.add(car);
       return car;
@@ -286,14 +292,14 @@ public class KafkaChannel extends BasicChannelSemantics {
     NONE
   }
 
-
   private class KafkaTransaction extends BasicTransactionSemantics {
 
     private TransactionType type = TransactionType.NONE;
     private Optional<ByteArrayOutputStream> tempOutStream = Optional
             .absent();
     // For put transactions, serialize the events and hold them until the commit goes is requested.
-    private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent();
+    private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords =
+        Optional.absent();
     // For take transactions, deserialize and hold them till commit goes through
     private Optional<LinkedList<Event>> events = Optional.absent();
     private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
@@ -323,8 +329,9 @@ public class KafkaChannel extends BasicChannelSemantics {
       }
       String key = event.getHeaders().get(KEY_HEADER);
       try {
-        producerRecords.get().add(new ProducerRecord<String, byte[]>
-                (topic.get(), key, serializeValue(event, parseAsFlumeEvent)));
+        producerRecords.get().add(
+            new ProducerRecord<String, byte[]>(topic.get(), key,
+                                               serializeValue(event, parseAsFlumeEvent)));
       } catch (Exception e) {
         throw new ChannelException("Error while serializing event", e);
       }
@@ -382,7 +389,8 @@ public class KafkaChannel extends BasicChannelSemantics {
             }
 
             if (logger.isDebugEnabled()) {
-              logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
+              logger.debug("Processed output from partition {} offset {}",
+                           record.partition(), record.offset());
             }
 
             long endTime = System.nanoTime();
@@ -391,10 +399,10 @@ public class KafkaChannel extends BasicChannelSemantics {
             return null;
           }
         } catch (Exception ex) {
-          logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " +
-                  "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex);
-          throw new ChannelException("Error while getting events from Kafka",
-                  ex);
+          logger.warn("Error while getting events from Kafka. This is usually caused by " +
+                      "trying to read a non-flume event. Ensure the setting for " +
+                      "parseAsFlumeEvent is correct", ex);
+          throw new ChannelException("Error while getting events from Kafka", ex);
         }
       }
       eventTaken = true;
@@ -564,8 +572,9 @@ public class KafkaChannel extends BasicChannelSemantics {
       StringBuilder sb = new StringBuilder();
       for (TopicPartition tp : this.consumer.assignment()) {
         try {
-          sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset())
-                  .append(",").append(this.consumer.committed(tp).metadata()).append("]");
+          sb.append("Committed: [").append(tp).append(",")
+              .append(this.consumer.committed(tp).offset())
+              .append(",").append(this.consumer.committed(tp).metadata()).append("]");
           if (logger.isDebugEnabled()) {
             logger.debug(sb.toString());
           }
@@ -596,8 +605,8 @@ class ChannelCallback implements Callback {
     }
     if (log.isDebugEnabled()) {
       long batchElapsedTime = System.currentTimeMillis() - startTime;
-      log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" +
-              metadata.offset() + "-" + batchElapsedTime);
+      log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" +
+                metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index faf46b6..ccf46d9 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -26,12 +26,17 @@ public class KafkaChannelConfiguration {
   public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
   public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
   public static final String DEFAULT_ACKS = "all";
-  public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-  public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
-  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-  public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String DEFAULT_KEY_SERIALIZER =
+      "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String DEFAULT_VALUE_SERIAIZER =
+      "org.apache.kafka.common.serialization.ByteArraySerializer";
+  public static final String DEFAULT_KEY_DESERIALIZER =
+      "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIAIZER =
+      "org.apache.kafka.common.serialization.ByteArrayDeserializer";
   public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
-  public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String BOOTSTRAP_SERVERS_CONFIG =
+      KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
   public static final String DEFAULT_TOPIC = "flume-channel";
   public static final String DEFAULT_GROUP_ID = "flume";
   public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
index bdf42cd..b46d646 100644
--- a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
+++ b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java
@@ -18,26 +18,26 @@
  */
 package org.apache.flume.channel;
 
-import java.util.ArrayDeque;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.GuardedBy;
-
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flume.*;
-import org.apache.flume.annotations.Recyclable;
-
+import com.google.common.base.Preconditions;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Recyclable;
 import org.apache.flume.channel.file.FileChannel;
 import org.apache.flume.instrumentation.ChannelCounter;
-
 import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.ArrayDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -50,28 +50,45 @@ import com.google.common.base.Preconditions;
 @Recyclable
 public class SpillableMemoryChannel extends FileChannel {
   // config settings
-  /** Max number of events to be stored in memory */
+  /**
+   * Max number of events to be stored in memory
+   */
   public static final String MEMORY_CAPACITY = "memoryCapacity";
-  /** Seconds to wait before enabling disk overflow when memory fills up */
+  /**
+   * Seconds to wait before enabling disk overflow when memory fills up
+   */
   public static final String OVERFLOW_TIMEOUT = "overflowTimeout";
-  /** Internal use only. To remain undocumented in User guide. Determines the
+  /**
+   * Internal use only. To remain undocumented in User guide. Determines the
    * percent free space available in mem queue when we stop spilling to overflow
    */
   public static final String OVERFLOW_DEACTIVATION_THRESHOLD
-          = "overflowDeactivationThreshold";
-  /** percent of buffer between byteCapacity and the estimated event size. */
+      = "overflowDeactivationThreshold";
+  /**
+   * percent of buffer between byteCapacity and the estimated event size.
+   */
   public static final String BYTE_CAPACITY_BUFFER_PERCENTAGE
-          = "byteCapacityBufferPercentage";
+      = "byteCapacityBufferPercentage";
 
-  /** max number of bytes used for all events in the queue. */
+  /**
+   * max number of bytes used for all events in the queue.
+   */
   public static final String BYTE_CAPACITY = "byteCapacity";
-  /** max number of events in overflow. */
+  /**
+   * max number of events in overflow.
+   */
   public static final String OVERFLOW_CAPACITY = "overflowCapacity";
-  /** file channel setting that is overriden by Spillable Channel */
+  /**
+   * file channel setting that is overriden by Spillable Channel
+   */
   public static final String KEEP_ALIVE = "keep-alive";
-  /** file channel capacity overridden by Spillable Channel */
+  /**
+   * file channel capacity overridden by Spillable Channel
+   */
   public static final String CAPACITY = "capacity";
-  /** Estimated average size of events expected to be in the channel */
+  /**
+   * Estimated average size of events expected to be in the channel
+   */
   public static final String AVG_EVENT_SIZE = "avgEventSize";
 
   private static Logger LOGGER = LoggerFactory.getLogger(SpillableMemoryChannel.class);
@@ -84,7 +101,7 @@ public class SpillableMemoryChannel extends FileChannel {
   // memory consumption control
   private static final int defaultAvgEventSize = 500;
   private static final Long defaultByteCapacity
-          = (long)(Runtime.getRuntime().maxMemory() * .80);
+      = (long) (Runtime.getRuntime().maxMemory() * .80);
   private static final int defaultByteCapacityBufferPercentage = 20;
 
   private volatile int byteCapacity;
@@ -94,7 +111,7 @@ public class SpillableMemoryChannel extends FileChannel {
   private Semaphore bytesRemaining;
 
   // for synchronizing access to primary/overflow channels & drain order
-  final private Object queueLock = new Object();
+  private final Object queueLock = new Object();
 
   @GuardedBy(value = "queueLock")
   public ArrayDeque<Event> memQueue;
@@ -109,8 +126,10 @@ public class SpillableMemoryChannel extends FileChannel {
 
   private int maxMemQueueSize = 0;     // max sie of memory Queue
 
-  private boolean overflowDisabled;         // if true indicates the overflow should not be used at all.
-  private boolean overflowActivated=false;  // indicates if overflow can be used. invariant: false if overflowDisabled is true.
+  private boolean overflowDisabled; // if true indicates the overflow should not be used at all.
+
+  // indicates if overflow can be used. invariant: false if overflowDisabled is true.
+  private boolean overflowActivated = false;
 
   // if true overflow can be used. invariant: false if overflowDisabled is true.
   private int memoryCapacity = -1;     // max events that the channel can hold  in memory
@@ -120,7 +139,7 @@ public class SpillableMemoryChannel extends FileChannel {
 
   // mem full % at which we stop spill to overflow
   private double overflowDeactivationThreshold
-          = defaultOverflowDeactivationThreshold / 100;
+      = defaultOverflowDeactivationThreshold / 100;
 
   public SpillableMemoryChannel() {
     super();
@@ -133,6 +152,7 @@ public class SpillableMemoryChannel extends FileChannel {
   public int getMemoryCapacity() {
     return memoryCapacity;
   }
+
   public int getOverflowTimeout() {
     return overflowTimeout;
   }
@@ -160,7 +180,6 @@ public class SpillableMemoryChannel extends FileChannel {
     }
   }
 
-
   private static class MutableInteger {
     private int value;
 
@@ -186,7 +205,7 @@ public class SpillableMemoryChannel extends FileChannel {
     public int totalPuts = 0;  // for debugging only
     private long overflowCounter = 0; // # of items in overflow channel
 
-    public  String dump() {
+    public String dump() {
       StringBuilder sb = new StringBuilder();
 
       sb.append("  [ ");
@@ -195,12 +214,12 @@ public class SpillableMemoryChannel extends FileChannel {
         sb.append(" ");
       }
       sb.append("]");
-      return  sb.toString();
+      return sb.toString();
     }
 
     public void putPrimary(Integer eventCount) {
       totalPuts += eventCount;
-      if (  (queue.peekLast() == null) || queue.getLast().intValue() < 0) {
+      if ((queue.peekLast() == null) || queue.getLast().intValue() < 0) {
         queue.addLast(new MutableInteger(eventCount));
       } else {
         queue.getLast().add(eventCount);
@@ -208,7 +227,7 @@ public class SpillableMemoryChannel extends FileChannel {
     }
 
     public void putFirstPrimary(Integer eventCount) {
-      if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {
+      if ((queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {
         queue.addFirst(new MutableInteger(eventCount));
       } else {
         queue.getFirst().add(eventCount);
@@ -217,7 +236,7 @@ public class SpillableMemoryChannel extends FileChannel {
 
     public void putOverflow(Integer eventCount) {
       totalPuts += eventCount;
-      if ( (queue.peekLast() == null) ||  queue.getLast().intValue() > 0) {
+      if ((queue.peekLast() == null) || queue.getLast().intValue() > 0) {
         queue.addLast(new MutableInteger(-eventCount));
       } else {
         queue.getLast().add(-eventCount);
@@ -226,9 +245,9 @@ public class SpillableMemoryChannel extends FileChannel {
     }
 
     public void putFirstOverflow(Integer eventCount) {
-      if ( (queue.peekFirst() == null) ||  queue.getFirst().intValue() > 0) {
+      if ((queue.peekFirst() == null) || queue.getFirst().intValue() > 0) {
         queue.addFirst(new MutableInteger(-eventCount));
-      }  else {
+      } else {
         queue.getFirst().add(-eventCount);
       }
       overflowCounter += eventCount;
@@ -247,9 +266,9 @@ public class SpillableMemoryChannel extends FileChannel {
 
       // this condition is optimization to avoid redundant conversions of
       // int -> Integer -> string in hot path
-      if (headValue.intValue() < takeCount)  {
+      if (headValue.intValue() < takeCount) {
         throw new IllegalStateException("Cannot take " + takeCount +
-                " from " + headValue.intValue() + " in DrainOrder Queue");
+            " from " + headValue.intValue() + " in DrainOrder Queue");
       }
 
       headValue.add(-takeCount);
@@ -260,9 +279,9 @@ public class SpillableMemoryChannel extends FileChannel {
 
     public void takeOverflow(int takeCount) {
       MutableInteger headValue = queue.getFirst();
-      if(headValue.intValue() > -takeCount) {
+      if (headValue.intValue() > -takeCount) {
         throw new IllegalStateException("Cannot take " + takeCount + " from "
-                + headValue.intValue() + " in DrainOrder Queue head " );
+            + headValue.intValue() + " in DrainOrder Queue head ");
       }
 
       headValue.add(takeCount);
@@ -293,7 +312,6 @@ public class SpillableMemoryChannel extends FileChannel {
     ArrayDeque<Event> putList;
     private final ChannelCounter channelCounter;
 
-
     public SpillableMemoryTransaction(ChannelCounter counter) {
       takeList = new ArrayDeque<Event>(largestTakeTxSize);
       putList = new ArrayDeque<Event>(largestPutTxSize);
@@ -307,26 +325,25 @@ public class SpillableMemoryChannel extends FileChannel {
 
     @Override
     public void close() {
-      if (overflowTakeTx!=null) {
+      if (overflowTakeTx != null) {
         overflowTakeTx.close();
       }
-      if (overflowPutTx!=null) {
+      if (overflowPutTx != null) {
         overflowPutTx.close();
       }
       super.close();
     }
 
-
     @Override
     protected void doPut(Event event) throws InterruptedException {
       channelCounter.incrementEventPutAttemptCount();
 
       putCalled = true;
-      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
+      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize);
       if (!putList.offer(event)) {
         throw new ChannelFullException("Put queue in " + getName() +
-                " channel's Transaction having capacity " + putList.size() +
-                " full, consider reducing batch size of sources");
+            " channel's Transaction having capacity " + putList.size() +
+            " full, consider reducing batch size of sources");
       }
       putListByteCount += eventByteSize;
     }
@@ -344,7 +361,7 @@ public class SpillableMemoryChannel extends FileChannel {
       boolean takeSuceeded = false;
       try {
         Event event;
-        synchronized(queueLock) {
+        synchronized (queueLock) {
           int drainOrderTop = drainOrder.front();
 
           if (!takeCalled) {
@@ -375,11 +392,11 @@ public class SpillableMemoryChannel extends FileChannel {
             ++takeCount;
             drainOrder.takePrimary(1);
             Preconditions.checkNotNull(event, "Queue.poll returned NULL despite"
-                    + " semaphore signalling existence of entry");
+                + " semaphore signalling existence of entry");
           }
         }
 
-        int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
+        int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize);
         if (!useOverflow) {
           // takeList is thd pvt, so no need to do this in synchronized block
           takeList.offer(event);
@@ -389,7 +406,7 @@ public class SpillableMemoryChannel extends FileChannel {
         takeSuceeded = true;
         return event;
       } finally {
-        if(!takeSuceeded) {
+        if (!takeSuceeded) {
           totalStored.release();
         }
       }
@@ -400,37 +417,35 @@ public class SpillableMemoryChannel extends FileChannel {
       if (putCalled) {
         putCommit();
         if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("Put Committed. Drain Order Queue state : "
-                  + drainOrder.dump());
+          LOGGER.debug("Put Committed. Drain Order Queue state : " + drainOrder.dump());
         }
       } else if (takeCalled) {
         takeCommit();
         if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("Take Committed. Drain Order Queue state : "
-                  + drainOrder.dump());
+          LOGGER.debug("Take Committed. Drain Order Queue state : " + drainOrder.dump());
         }
       }
     }
 
     private void takeCommit() {
-      if (takeCount > largestTakeTxSize)
+      if (takeCount > largestTakeTxSize) {
         largestTakeTxSize = takeCount;
+      }
 
       synchronized (queueLock) {
-        if (overflowTakeTx!=null) {
+        if (overflowTakeTx != null) {
           overflowTakeTx.commit();
         }
-        double memoryPercentFree = (memoryCapacity == 0) ?  0
-           :  (memoryCapacity - memQueue.size() + takeCount ) / (double)memoryCapacity ;
+        double memoryPercentFree = (memoryCapacity == 0) ? 0 :
+            (memoryCapacity - memQueue.size() + takeCount) / (double) memoryCapacity;
 
-        if (overflowActivated
-                &&  memoryPercentFree >= overflowDeactivationThreshold) {
+        if (overflowActivated && memoryPercentFree >= overflowDeactivationThreshold) {
           overflowActivated = false;
           LOGGER.info("Overflow Deactivated");
         }
         channelCounter.setChannelSize(getTotalStored());
       }
-      if (!useOverflow)  {
+      if (!useOverflow) {
         memQueRemaining.release(takeCount);
         bytesRemaining.release(takeListByteCount);
       }
@@ -440,30 +455,29 @@ public class SpillableMemoryChannel extends FileChannel {
 
     private void putCommit() throws InterruptedException {
       // decide if overflow needs to be used
-      int timeout = overflowActivated  ? 0 : overflowTimeout;
+      int timeout = overflowActivated ? 0 : overflowTimeout;
 
       if (memoryCapacity != 0) {
         // check for enough event slots(memoryCapacity) for using memory queue
         if (!memQueRemaining.tryAcquire(putList.size(), timeout,
-          TimeUnit.SECONDS)) {
+            TimeUnit.SECONDS)) {
           if (overflowDisabled) {
             throw new ChannelFullException("Spillable Memory Channel's " +
-              "memory capacity has been reached and overflow is " +
-              "disabled. Consider increasing memoryCapacity.");
+                "memory capacity has been reached and overflow is " +
+                "disabled. Consider increasing memoryCapacity.");
           }
           overflowActivated = true;
           useOverflow = true;
-        }
         // check if we have enough byteCapacity for using memory queue
-        else if (!bytesRemaining.tryAcquire(putListByteCount, overflowTimeout
-          , TimeUnit.SECONDS)) {
+        } else if (!bytesRemaining.tryAcquire(putListByteCount,
+                                              overflowTimeout, TimeUnit.SECONDS)) {
           memQueRemaining.release(putList.size());
           if (overflowDisabled) {
             throw new ChannelFullException("Spillable Memory Channel's "
-              + "memory capacity has been reached.  "
-              + (bytesRemaining.availablePermits() * (int) avgEventSize)
-              + " bytes are free and overflow is disabled. Consider "
-              + "increasing byteCapacity or capacity.");
+                + "memory capacity has been reached.  "
+                + (bytesRemaining.availablePermits() * (int) avgEventSize)
+                + " bytes are free and overflow is disabled. Consider "
+                + "increasing byteCapacity or capacity.");
           }
           overflowActivated = true;
           useOverflow = true;
@@ -496,22 +510,21 @@ public class SpillableMemoryChannel extends FileChannel {
     }
 
     private void commitPutsToOverflow_core(Transaction overflowPutTx)
-            throws InterruptedException {
+        throws InterruptedException {
       // reattempt only once if overflow is full first time around
-      for (int i = 0; i < 2; ++i)  {
+      for (int i = 0; i < 2; ++i) {
         try {
-          synchronized(queueLock) {
+          synchronized (queueLock) {
             overflowPutTx.commit();
             drainOrder.putOverflow(putList.size());
             channelCounter.setChannelSize(memQueue.size()
-                    + drainOrder.overflowCounter);
+                + drainOrder.overflowCounter);
             break;
           }
-        } catch (ChannelFullException e)  { // drop lock & reattempt
-          if (i==0) {
-            Thread.sleep(overflowTimeout *1000);
-          }
-          else {
+        } catch (ChannelFullException e) { // drop lock & reattempt
+          if (i == 0) {
+            Thread.sleep(overflowTimeout * 1000);
+          } else {
             throw e;
           }
         }
@@ -523,14 +536,14 @@ public class SpillableMemoryChannel extends FileChannel {
         for (Event e : putList) {
           if (!memQueue.offer(e)) {
             throw new ChannelException("Unable to insert event into memory " +
-                    "queue in spite of spare capacity, this is very unexpected");
+                "queue in spite of spare capacity, this is very unexpected");
           }
         }
         drainOrder.putPrimary(putList.size());
-        maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ?  memQueue.size()
-                                                              : maxMemQueueSize;
+        maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size()
+            : maxMemQueueSize;
         channelCounter.setChannelSize(memQueue.size()
-                + drainOrder.overflowCounter);
+            + drainOrder.overflowCounter);
       }
       // update counters and semaphores
       totalStored.release(putList.size());
@@ -540,10 +553,10 @@ public class SpillableMemoryChannel extends FileChannel {
     @Override
     protected void doRollback() {
       LOGGER.debug("Rollback() of " +
-              (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
+          (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
 
       if (putCalled) {
-        if (overflowPutTx!=null) {
+        if (overflowPutTx != null) {
           overflowPutTx.rollback();
         }
         if (!useOverflow) {
@@ -552,8 +565,8 @@ public class SpillableMemoryChannel extends FileChannel {
         }
         putListByteCount = 0;
       } else if (takeCalled) {
-        synchronized(queueLock) {
-          if (overflowTakeTx!=null) {
+        synchronized (queueLock) {
+          if (overflowTakeTx != null) {
             overflowTakeTx.rollback();
           }
           if (useOverflow) {
@@ -561,8 +574,8 @@ public class SpillableMemoryChannel extends FileChannel {
           } else {
             int remainingCapacity = memoryCapacity - memQueue.size();
             Preconditions.checkState(remainingCapacity >= takeCount,
-                    "Not enough space in memory queue to rollback takes. This" +
-                            " should never happen, please report");
+                "Not enough space in memory queue to rollback takes. This" +
+                    " should never happen, please report");
             while (!takeList.isEmpty()) {
               memQueue.addFirst(takeList.removeLast());
             }
@@ -582,15 +595,18 @@ public class SpillableMemoryChannel extends FileChannel {
    * <li>memoryCapacity = total number of events allowed at one time in the memory queue.
    * <li>overflowCapacity = total number of events allowed at one time in the overflow file channel.
    * <li>byteCapacity = the max number of bytes used for events in the memory queue.
-   * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity and the estimated event size.
-   * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to enable overflow
+   * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity
+   *     and the estimated event size.
+   * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to
+   *     enable overflow
    */
   @Override
   public void configure(Context context) {
 
-    if (getLifecycleState() == LifecycleState.START    // does not support reconfig when running
-            || getLifecycleState() == LifecycleState.ERROR)
+    if (getLifecycleState() == LifecycleState.START ||  // does not support reconfig when running
+        getLifecycleState() == LifecycleState.ERROR) {
       stop();
+    }
 
     if (totalStored == null) {
       totalStored = new Semaphore(0);
@@ -603,8 +619,7 @@ public class SpillableMemoryChannel extends FileChannel {
     // 1) Memory Capacity
     Integer newMemoryCapacity;
     try {
-      newMemoryCapacity = context.getInteger(MEMORY_CAPACITY
-              , defaultMemoryCapacity);
+      newMemoryCapacity = context.getInteger(MEMORY_CAPACITY, defaultMemoryCapacity);
       if (newMemoryCapacity == null) {
         newMemoryCapacity = defaultMemoryCapacity;
       }
@@ -612,7 +627,7 @@ public class SpillableMemoryChannel extends FileChannel {
         throw new NumberFormatException(MEMORY_CAPACITY + " must be >= 0");
       }
 
-    } catch(NumberFormatException e) {
+    } catch (NumberFormatException e) {
       newMemoryCapacity = defaultMemoryCapacity;
       LOGGER.warn("Invalid " + MEMORY_CAPACITY + " specified, initializing " +
           getName() + " channel to default value of {}", defaultMemoryCapacity);
@@ -626,60 +641,60 @@ public class SpillableMemoryChannel extends FileChannel {
     // overflowTimeout - wait time before switching to overflow when mem is full
     try {
       Integer newOverflowTimeout =
-              context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout);
+          context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout);
       overflowTimeout = (newOverflowTimeout != null) ? newOverflowTimeout
-                                                     : defaultOverflowTimeout;
-    } catch(NumberFormatException e) {
+          : defaultOverflowTimeout;
+    } catch (NumberFormatException e) {
       LOGGER.warn("Incorrect value for " + getName() + "'s " + OVERFLOW_TIMEOUT
-            + " setting. Using default value {}", defaultOverflowTimeout);
+          + " setting. Using default value {}", defaultOverflowTimeout);
       overflowTimeout = defaultOverflowTimeout;
     }
 
     try {
       Integer newThreshold = context.getInteger(OVERFLOW_DEACTIVATION_THRESHOLD);
-      overflowDeactivationThreshold =  (newThreshold != null) ?
-                                   newThreshold/100.0
-                                 : defaultOverflowDeactivationThreshold / 100.0;
-    } catch(NumberFormatException e) {
+      overflowDeactivationThreshold = (newThreshold != null) ?
+          newThreshold / 100.0
+          : defaultOverflowDeactivationThreshold / 100.0;
+    } catch (NumberFormatException e) {
       LOGGER.warn("Incorrect value for " + getName() + "'s " +
               OVERFLOW_DEACTIVATION_THRESHOLD + ". Using default value {} %",
-              defaultOverflowDeactivationThreshold);
+          defaultOverflowDeactivationThreshold);
       overflowDeactivationThreshold = defaultOverflowDeactivationThreshold / 100.0;
     }
 
     // 3) Memory consumption control
     try {
       byteCapacityBufferPercentage =
-              context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE
-                      , defaultByteCapacityBufferPercentage);
-    } catch(NumberFormatException e) {
+          context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE, defaultByteCapacityBufferPercentage);
+    } catch (NumberFormatException e) {
       LOGGER.warn("Error parsing " + BYTE_CAPACITY_BUFFER_PERCENTAGE + " for "
-              + getName() + ". Using default="
-              + defaultByteCapacityBufferPercentage + ". " + e.getMessage());
+          + getName() + ". Using default="
+          + defaultByteCapacityBufferPercentage + ". " + e.getMessage());
       byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
     }
 
     try {
       avgEventSize = context.getInteger(AVG_EVENT_SIZE, defaultAvgEventSize);
-    } catch ( NumberFormatException e) {
+    } catch (NumberFormatException e) {
       LOGGER.warn("Error parsing " + AVG_EVENT_SIZE + " for " + getName()
-              + ". Using default = " + defaultAvgEventSize + ". "
-              + e.getMessage());
+          + ". Using default = " + defaultAvgEventSize + ". "
+          + e.getMessage());
       avgEventSize = defaultAvgEventSize;
     }
 
     try {
-      byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) * (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize);
+      byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) *
+                            (1 - byteCapacityBufferPercentage * .01)) / avgEventSize);
       if (byteCapacity < 1) {
         byteCapacity = Integer.MAX_VALUE;
       }
-    } catch(NumberFormatException e) {
+    } catch (NumberFormatException e) {
       LOGGER.warn("Error parsing " + BYTE_CAPACITY + " setting for " + getName()
-              + ". Using default = " + defaultByteCapacity + ". "
-              + e.getMessage());
+          + ". Using default = " + defaultByteCapacity + ". "
+          + e.getMessage());
       byteCapacity = (int)
-            ( (defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 ))
-                    / avgEventSize);
+          ((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01))
+              / avgEventSize);
     }
 
 
@@ -692,8 +707,10 @@ public class SpillableMemoryChannel extends FileChannel {
         lastByteCapacity = byteCapacity;
       } else {
         try {
-          if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, overflowTimeout, TimeUnit.SECONDS)) {
-            LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
+          if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity,
+                                         overflowTimeout, TimeUnit.SECONDS)) {
+            LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, " +
+                        "resizing has been aborted");
           } else {
             lastByteCapacity = byteCapacity;
           }
@@ -704,51 +721,53 @@ public class SpillableMemoryChannel extends FileChannel {
     }
 
     try {
-      overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity);  // file channel capacity
+      // file channel capacity
+      overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity);
       // Determine if File channel needs to be disabled
-        if ( memoryCapacity < 1   &&   overflowCapacity < 1) {
-          LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY +
-                  " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " +
-                  "Using default value " + OVERFLOW_CAPACITY + " = " +
-                  defaultOverflowCapacity);
-          overflowCapacity = defaultOverflowCapacity;
-        }
-        overflowDisabled = (overflowCapacity < 1) ;
-        if (overflowDisabled) {
-          overflowActivated = false;
-        }
-    } catch(NumberFormatException e) {
+      if (memoryCapacity < 1 && overflowCapacity < 1) {
+        LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY +
+            " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " +
+            "Using default value " + OVERFLOW_CAPACITY + " = " +
+            defaultOverflowCapacity);
+        overflowCapacity = defaultOverflowCapacity;
+      }
+      overflowDisabled = (overflowCapacity < 1);
+      if (overflowDisabled) {
+        overflowActivated = false;
+      }
+    } catch (NumberFormatException e) {
       overflowCapacity = defaultOverflowCapacity;
     }
 
     // Configure File channel
-    context.put(KEEP_ALIVE,"0"); // override keep-alive for  File channel
-    context.put(CAPACITY, Integer.toString(overflowCapacity) );  // file channel capacity
+    context.put(KEEP_ALIVE, "0"); // override keep-alive for  File channel
+    context.put(CAPACITY, Integer.toString(overflowCapacity));  // file channel capacity
     super.configure(context);
   }
 
 
   private void resizePrimaryQueue(int newMemoryCapacity) throws InterruptedException {
-    if (memQueue != null   &&   memoryCapacity == newMemoryCapacity) {
+    if (memQueue != null && memoryCapacity == newMemoryCapacity) {
       return;
     }
 
     if (memoryCapacity > newMemoryCapacity) {
       int diff = memoryCapacity - newMemoryCapacity;
       if (!memQueRemaining.tryAcquire(diff, overflowTimeout, TimeUnit.SECONDS)) {
-        LOGGER.warn("Memory buffer currently contains more events than the new size. Downsizing has been aborted.");
+        LOGGER.warn("Memory buffer currently contains more events than the new size. " +
+                    "Downsizing has been aborted.");
         return;
       }
-      synchronized(queueLock) {
+      synchronized (queueLock) {
         ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity);
         newQueue.addAll(memQueue);
         memQueue = newQueue;
         memoryCapacity = newMemoryCapacity;
       }
-    } else  {   // if (memoryCapacity <= newMemoryCapacity)
-      synchronized(queueLock) {
+    } else {   // if (memoryCapacity <= newMemoryCapacity)
+      synchronized (queueLock) {
         ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity);
-        if (memQueue !=null) {
+        if (memQueue != null) {
           newQueue.addAll(memQueue);
         }
         memQueue = newQueue;
@@ -771,14 +790,14 @@ public class SpillableMemoryChannel extends FileChannel {
       drainOrder.putOverflow(overFlowCount);
       totalStored.release(overFlowCount);
     }
-    int totalCount =  overFlowCount + memQueue.size();
+    int totalCount = overFlowCount + memQueue.size();
     channelCounter.setChannelCapacity(memoryCapacity + getOverflowCapacity());
     channelCounter.setChannelSize(totalCount);
   }
 
   @Override
   public synchronized void stop() {
-    if (getLifecycleState()==LifecycleState.STOP) {
+    if (getLifecycleState() == LifecycleState.STOP) {
       return;
     }
     channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
index 713234f..ae31916 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
@@ -99,11 +99,11 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
 
   @Override
   public synchronized void append(LoggingEvent event) {
-    if(!configured) {
+    if (!configured) {
       String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" +
-        " send events to Flume.";
+          " send events to Flume.";
       LogLog.error(errorMsg);
-      if(getUnsafeMode()) {
+      if (getUnsafeMode()) {
         return;
       }
       throw new FlumeException(errorMsg);
@@ -121,10 +121,9 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
   @Override
   public void activateOptions() throws FlumeException {
     try {
-      final Properties properties = getProperties(hosts, selector,
-        maxBackoff, getTimeout());
+      final Properties properties = getProperties(hosts, selector, maxBackoff, getTimeout());
       rpcClient = RpcClientFactory.getInstance(properties);
-      if(layout != null) {
+      if (layout != null) {
         layout.activateOptions();
       }
       configured = true;
@@ -169,14 +168,13 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
         throw new FlumeException(
             "Misconfigured max backoff, value must be greater than 0");
       }
-      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,
-          String.valueOf(true));
+      props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF, String.valueOf(true));
       props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);
     }
     props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
-      String.valueOf(timeout));
+                      String.valueOf(timeout));
     props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
-      String.valueOf(timeout));
+                      String.valueOf(timeout));
     return props;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 7c483db..f9803e4 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -75,8 +75,7 @@ public class Log4jAppender extends AppenderSkeleton {
   private String hostname;
   private int port;
   private boolean unsafeMode = false;
-  private long timeout = RpcClientConfigurationConstants
-    .DEFAULT_REQUEST_TIMEOUT_MILLIS;
+  private long timeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
   private boolean avroReflectionEnabled;
   private String avroSchemaUrl;
 
@@ -99,7 +98,7 @@ public class Log4jAppender extends AppenderSkeleton {
    * @param port The port to connect on the host.
    *
    */
-  public Log4jAppender(String hostname, int port){
+  public Log4jAppender(String hostname, int port) {
     this.hostname = hostname;
     this.port = port;
   }
@@ -112,14 +111,14 @@ public class Log4jAppender extends AppenderSkeleton {
    * was a connection error.
    */
   @Override
-  public synchronized void append(LoggingEvent event) throws FlumeException{
+  public synchronized void append(LoggingEvent event) throws FlumeException {
     //If rpcClient is null, it means either this appender object was never
     //setup by setting hostname and port and then calling activateOptions
     //or this appender object was closed by calling close(), so we throw an
     //exception to show the appender is no longer accessible.
     if (rpcClient == null) {
       String errorMsg = "Cannot Append to Appender! Appender either closed or" +
-        " not setup correctly!";
+          " not setup correctly!";
       LogLog.error(errorMsg);
       if (unsafeMode) {
         return;
@@ -127,7 +126,7 @@ public class Log4jAppender extends AppenderSkeleton {
       throw new FlumeException(errorMsg);
     }
 
-    if(!rpcClient.isActive()){
+    if (!rpcClient.isActive()) {
       reconnect();
     }
 
@@ -231,7 +230,7 @@ public class Log4jAppender extends AppenderSkeleton {
     } else {
       String errorMsg = "Flume log4jappender already closed!";
       LogLog.error(errorMsg);
-      if(unsafeMode) {
+      if (unsafeMode) {
         return;
       }
       throw new FlumeException(errorMsg);
@@ -251,7 +250,7 @@ public class Log4jAppender extends AppenderSkeleton {
    * Set the first flume hop hostname.
    * @param hostname The first hop where the client should connect to.
    */
-  public void setHostname(String hostname){
+  public void setHostname(String hostname) {
     this.hostname = hostname;
   }
 
@@ -259,7 +258,7 @@ public class Log4jAppender extends AppenderSkeleton {
    * Set the port on the hostname to connect to.
    * @param port The port to connect on the host.
    */
-  public void setPort(int port){
+  public void setPort(int port) {
     this.port = port;
   }
 
@@ -299,19 +298,18 @@ public class Log4jAppender extends AppenderSkeleton {
     Properties props = new Properties();
     props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
     props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
-      hostname + ":" + port);
+        hostname + ":" + port);
     props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
-     String.valueOf(timeout));
+        String.valueOf(timeout));
     props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
-      String.valueOf(timeout));
+        String.valueOf(timeout));
     try {
       rpcClient = RpcClientFactory.getInstance(props);
       if (layout != null) {
         layout.activateOptions();
       }
     } catch (FlumeException e) {
-      String errormsg = "RPC client creation failed! " +
-        e.getMessage();
+      String errormsg = "RPC client creation failed! " + e.getMessage();
       LogLog.error(errormsg);
       if (unsafeMode) {
         return;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
index b68e749..22983d3 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
@@ -30,24 +30,23 @@ public enum Log4jAvroHeaders {
   AVRO_SCHEMA_URL("flume.avro.schema.url");
 
   private String headerName;
-  private Log4jAvroHeaders(String headerName){
+  private Log4jAvroHeaders(String headerName) {
     this.headerName = headerName;
   }
 
-  public String getName(){
+  public String getName() {
     return headerName;
   }
 
-  public String toString(){
+  public String toString() {
     return getName();
   }
 
-  public static Log4jAvroHeaders getByName(String headerName){
+  public static Log4jAvroHeaders getByName(String headerName) {
     Log4jAvroHeaders hdrs = null;
-    try{
+    try {
       hdrs = Log4jAvroHeaders.valueOf(headerName.toLowerCase(Locale.ENGLISH).trim());
-    }
-    catch(IllegalArgumentException e){
+    } catch (IllegalArgumentException e) {
       hdrs = Log4jAvroHeaders.OTHER;
     }
     return hdrs;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
index c0460d2..f00b571 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java
@@ -86,7 +86,7 @@ public class Context {
     Preconditions.checkArgument(prefix.endsWith("."),
         "The given prefix does not end with a period (" + prefix + ")");
     Map<String, String> result = Maps.newHashMap();
-    synchronized(parameters) {
+    synchronized (parameters) {
       for (String key : parameters.keySet()) {
         if (key.startsWith(prefix)) {
           String name = key.substring(prefix.length());
@@ -129,7 +129,7 @@ public class Context {
    */
   public Boolean getBoolean(String key, Boolean defaultValue) {
     String value = get(key);
-    if(value != null) {
+    if (value != null) {
       return Boolean.parseBoolean(value.trim());
     }
     return defaultValue;
@@ -158,7 +158,7 @@ public class Context {
    */
   public Integer getInteger(String key, Integer defaultValue) {
     String value = get(key);
-    if(value != null) {
+    if (value != null) {
       return Integer.parseInt(value.trim());
     }
     return defaultValue;
@@ -187,7 +187,7 @@ public class Context {
    */
   public Long getLong(String key, Long defaultValue) {
     String value = get(key);
-    if(value != null) {
+    if (value != null) {
       return Long.parseLong(value.trim());
     }
     return defaultValue;
@@ -227,7 +227,7 @@ public class Context {
   }
   private String get(String key, String defaultValue) {
     String result = parameters.get(key);
-    if(result != null) {
+    if (result != null) {
       return result;
     }
     return defaultValue;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
index d6aa33a..9089122 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java
@@ -22,7 +22,6 @@ public final class BasicConfigurationConstants {
   public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
   public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";
 
-
   public static final String CONFIG_SINKS = "sinks";
   public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
   public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
index 0e0614e..477a3e6 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java
@@ -64,7 +64,7 @@ public abstract class ComponentConfiguration {
     failIfConfigured();
     String confType = context.getString(
         BasicConfigurationConstants.CONFIG_TYPE);
-    if (confType != null && !confType.isEmpty()){
+    if (confType != null && !confType.isEmpty()) {
       this.type = confType;
     }
     // Type can be set by child class constructors, so check if it was.
@@ -74,12 +74,12 @@ public abstract class ComponentConfiguration {
           FlumeConfigurationErrorType.ATTRS_MISSING, ErrorOrWarning.ERROR));
 
       throw new ConfigurationException(
-          "Component has no type. Cannot configure. "+ componentName);
+          "Component has no type. Cannot configure. " + componentName);
     }
   }
 
   protected void failIfConfigured() throws ConfigurationException {
-    if (configured){
+    if (configured) {
       throw new ConfigurationException("Already configured component."
           + componentName);
     }
@@ -134,12 +134,13 @@ public abstract class ComponentConfiguration {
     CHANNELSELECTOR("ChannelSelector");
 
     private final String componentType;
-    private ComponentType(String type){
+
+    private ComponentType(String type) {
       componentType = type;
     }
+
     public String getComponentType() {
       return componentType;
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
index 0433c9c..16860c3 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java
@@ -27,9 +27,9 @@ import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfig
 import org.apache.flume.conf.source.SourceConfiguration.SourceConfigurationType;
 
 public class ComponentConfigurationFactory {
+
   @SuppressWarnings("unchecked")
-  public static ComponentConfiguration
-  create(String name, String type, ComponentType component)
+  public static ComponentConfiguration create(String name, String type, ComponentType component)
       throws ConfigurationException {
     Class<? extends ComponentConfiguration> confType = null;
 
@@ -43,7 +43,7 @@ public class ComponentConfigurationFactory {
     } catch (Exception ignored) {
       try {
         type = type.toUpperCase(Locale.ENGLISH);
-        switch(component){
+        switch (component) {
           case SOURCE:
             return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
                 .getConfiguration(name);
@@ -63,8 +63,7 @@ public class ComponentConfigurationFactory {
             return new SinkGroupConfiguration(name);
           default:
             throw new ConfigurationException(
-                "Cannot create configuration. Unknown Type specified: " +
-                    type);
+                "Cannot create configuration. Unknown Type specified: " + type);
         }
       } catch (ConfigurationException e) {
         throw e;