You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/08 22:50:59 UTC

[7/9] flume git commit: FLUME-2941. Integrate checkstyle for test classes

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
index 6ca3246..f290035 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
@@ -18,18 +18,6 @@
  */
 package org.apache.flume.channel.file.encryption;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.security.Key;
-import java.security.KeyStore;
-import java.util.List;
-import java.util.Map;
-
-import javax.crypto.KeyGenerator;
-
-import org.apache.flume.channel.file.TestUtils;
-
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
@@ -37,6 +25,16 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
+import org.apache.flume.channel.file.TestUtils;
+
+import javax.crypto.KeyGenerator;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.Key;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.Map;
 
 public class EncryptionTestUtils {
 
@@ -50,33 +48,32 @@ public class EncryptionTestUtils {
       throw Throwables.propagate(e);
     }
   }
-  public static void createKeyStore(File keyStoreFile,
-      File keyStorePasswordFile, Map<String, File> keyAliasPassword)
-          throws Exception {
+
+  public static void createKeyStore(File keyStoreFile, File keyStorePasswordFile,
+                                    Map<String, File> keyAliasPassword) throws Exception {
     KeyStore ks = KeyStore.getInstance("jceks");
     ks.load(null);
     List<String> keysWithSeperatePasswords = Lists.newArrayList();
-    for(String alias : keyAliasPassword.keySet()) {
+    for (String alias : keyAliasPassword.keySet()) {
       Key key = newKey();
       char[] password = null;
       File passwordFile = keyAliasPassword.get(alias);
-      if(passwordFile == null) {
-        password = Files.toString(keyStorePasswordFile, Charsets.UTF_8)
-            .toCharArray();
+      if (passwordFile == null) {
+        password = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
       } else {
         keysWithSeperatePasswords.add(alias);
         password = Files.toString(passwordFile, Charsets.UTF_8).toCharArray();
       }
       ks.setKeyEntry(alias, key, password, null);
     }
-    char[] keyStorePassword = Files.
-        toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
+    char[] keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
     FileOutputStream outputStream = new FileOutputStream(keyStoreFile);
     ks.store(outputStream, keyStorePassword);
     outputStream.close();
   }
-  public static Map<String, File> configureTestKeyStore(File baseDir,
-      File keyStoreFile) throws IOException {
+
+  public static Map<String, File> configureTestKeyStore(File baseDir, File keyStoreFile)
+      throws IOException {
     Map<String, File> result = Maps.newHashMap();
 
     if (System.getProperty("java.vendor").contains("IBM")) {
@@ -86,50 +83,52 @@ public class EncryptionTestUtils {
       Resources.copy(Resources.getResource("sun-test.keystore"),
           new FileOutputStream(keyStoreFile));
     }
-    /*
-    Commands below:
-    keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-      -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \
-      -storetype jceks -storepass keyStorePassword
-    keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-      -keystore src/test/resources/test.keystore -storetype jceks \
-      -storepass keyStorePassword
+
+    /* Commands below:
+     * keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
+     *   -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \
+     *   -storetype jceks -storepass keyStorePassword
+     * keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
+     *   -keystore src/test/resources/test.keystore -storetype jceks \
+     *   -storepass keyStorePassword
      */
-//  key-0 has own password, key-1 used key store password
-    result.put("key-0",
-        TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword"));
+    // key-0 has own password, key-1 used key store password
+    result.put("key-0", TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword"));
     result.put("key-1", null);
     return result;
   }
-  public static Map<String,String> configureForKeyStore(File keyStoreFile,
-      File keyStorePasswordFile, Map<String, File> keyAliasPassword)
-          throws Exception {
+
+  public static Map<String, String> configureForKeyStore(File keyStoreFile,
+                                                         File keyStorePasswordFile,
+                                                         Map<String, File> keyAliasPassword)
+      throws Exception {
     Map<String, String> context = Maps.newHashMap();
     List<String> keys = Lists.newArrayList();
     Joiner joiner = Joiner.on(".");
-    for(String alias : keyAliasPassword.keySet()) {
+    for (String alias : keyAliasPassword.keySet()) {
       File passwordFile = keyAliasPassword.get(alias);
-      if(passwordFile == null) {
+      if (passwordFile == null) {
         keys.add(alias);
       } else {
         String propertyName = joiner.join(EncryptionConfiguration.KEY_PROVIDER,
-            EncryptionConfiguration.JCE_FILE_KEYS, alias,
-            EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
+                                          EncryptionConfiguration.JCE_FILE_KEYS,
+                                          alias,
+                                          EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
         keys.add(alias);
         context.put(propertyName, passwordFile.getAbsolutePath());
       }
     }
     context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
-        EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE),
-        keyStoreFile.getAbsolutePath());
-    if(keyStorePasswordFile != null) {
+                            EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE),
+                keyStoreFile.getAbsolutePath());
+    if (keyStorePasswordFile != null) {
       context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
-          EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE),
-          keyStorePasswordFile.getAbsolutePath());
+                              EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE),
+                  keyStorePasswordFile.getAbsolutePath());
     }
     context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
-        EncryptionConfiguration.JCE_FILE_KEYS),
-        Joiner.on(" ").join(keys));
+                            EncryptionConfiguration.JCE_FILE_KEYS),
+                Joiner.on(" ").join(keys));
     return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
index a7c7cb2..d483fcc 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
@@ -35,13 +35,13 @@ public class TestAESCTRNoPaddingProvider {
   public void setup() throws Exception {
     KeyGenerator keyGen = KeyGenerator.getInstance("AES");
     key = keyGen.generateKey();
-    encryptor = CipherProviderFactory.
-        getEncrypter(CipherProviderType.AESCTRNOPADDING.name(), key);
-    decryptor = CipherProviderFactory.
-        getDecrypter(CipherProviderType.AESCTRNOPADDING.name(), key,
-            encryptor.getParameters());
+    encryptor = CipherProviderFactory.getEncrypter(
+        CipherProviderType.AESCTRNOPADDING.name(), key);
+    decryptor = CipherProviderFactory.getDecrypter(
+        CipherProviderType.AESCTRNOPADDING.name(), key, encryptor.getParameters());
     cipherProviderTestSuite = new CipherProviderTestSuite(encryptor, decryptor);
   }
+
   @Test
   public void test() throws Exception {
     cipherProviderTestSuite.test();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
index d4537a8..4e5ab6f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
@@ -18,18 +18,10 @@
  */
 package org.apache.flume.channel.file.encryption;
 
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 import org.apache.flume.ChannelException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.channel.file.FileChannelConfiguration;
@@ -42,10 +34,21 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
 
 public class TestFileChannelEncryption extends TestFileChannelBase {
   protected static final Logger LOGGER =
@@ -62,36 +65,39 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     keyStoreFile = new File(baseDir, "keyStoreFile");
     Assert.assertTrue(keyStoreFile.createNewFile());
     keyAliasPassword = Maps.newHashMap();
-    keyAliasPassword.putAll(EncryptionTestUtils.
-        configureTestKeyStore(baseDir, keyStoreFile));
+    keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile));
   }
+
   @After
   public void teardown() {
     super.teardown();
   }
+
   private Map<String, String> getOverrides() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(100));
-    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
-        String.valueOf(100));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, String.valueOf(100));
     return overrides;
   }
+
   private Map<String, String> getOverridesForEncryption() throws Exception {
     Map<String, String> overrides = getOverrides();
-    Map<String, String> encryptionProps = EncryptionTestUtils.
-        configureForKeyStore(keyStoreFile,
-            keyStorePasswordFile, keyAliasPassword);
+    Map<String, String> encryptionProps =
+        EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+                                                 keyStorePasswordFile,
+                                                 keyAliasPassword);
     encryptionProps.put(EncryptionConfiguration.KEY_PROVIDER,
-        KeyProviderType.JCEKSFILE.name());
+                        KeyProviderType.JCEKSFILE.name());
     encryptionProps.put(EncryptionConfiguration.CIPHER_PROVIDER,
-        CipherProviderType.AESCTRNOPADDING.name());
+                        CipherProviderType.AESCTRNOPADDING.name());
     encryptionProps.put(EncryptionConfiguration.ACTIVE_KEY, "key-1");
-    for(String key : encryptionProps.keySet()) {
+    for (String key : encryptionProps.keySet()) {
       overrides.put(EncryptionConfiguration.ENCRYPTION_PREFIX + "." + key,
-          encryptionProps.get(key));
+                    encryptionProps.get(key));
     }
     return overrides;
   }
+
   /**
    * Test fails without FLUME-1565
    */
@@ -222,15 +228,16 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     channel = createFileChannel(noEncryptionOverrides);
     channel.start();
 
-    if(channel.isOpen()) {
+    if (channel.isOpen()) {
       try {
         takeEvents(channel, 1, 1);
         Assert.fail("Channel was opened and take did not throw exception");
-      } catch(ChannelException ex) {
+      } catch (ChannelException ex) {
         // expected
       }
     }
   }
+
   @Test
   public void testUnencyrptedAndEncryptedLogs() throws Exception {
     Map<String, String> noEncryptionOverrides = getOverrides();
@@ -252,41 +259,46 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
+
   @Test
   public void testBadKeyProviderInvalidValue() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
     overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
-        EncryptionConfiguration.KEY_PROVIDER), "invalid");
+                                      EncryptionConfiguration.KEY_PROVIDER),
+                  "invalid");
     try {
       channel = createFileChannel(overrides);
       Assert.fail();
-    } catch(FlumeException ex) {
-      Assert.assertEquals("java.lang.ClassNotFoundException: invalid",
-          ex.getMessage());
+    } catch (FlumeException ex) {
+      Assert.assertEquals("java.lang.ClassNotFoundException: invalid", ex.getMessage());
     }
   }
+
   @Test
   public void testBadKeyProviderInvalidClass() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
     overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
-        EncryptionConfiguration.KEY_PROVIDER), String.class.getName());
+                                      EncryptionConfiguration.KEY_PROVIDER),
+                  String.class.getName());
     try {
       channel = createFileChannel(overrides);
       Assert.fail();
-    } catch(FlumeException ex) {
-      Assert.assertEquals("Unable to instantiate Builder from java.lang.String",
-          ex.getMessage());
+    } catch (FlumeException ex) {
+      Assert.assertEquals("Unable to instantiate Builder from java.lang.String", ex.getMessage());
     }
   }
+
   @Test
   public void testBadCipherProviderInvalidValue() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
     overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
-        EncryptionConfiguration.CIPHER_PROVIDER), "invalid");
+                                      EncryptionConfiguration.CIPHER_PROVIDER),
+                  "invalid");
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertFalse(channel.isOpen());
   }
+
   @Test
   public void testBadCipherProviderInvalidClass() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
@@ -296,6 +308,7 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     channel.start();
     Assert.assertFalse(channel.isOpen());
   }
+
   @Test
   public void testMissingKeyStoreFile() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
@@ -306,11 +319,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     try {
       channel = createFileChannel(overrides);
       Assert.fail();
-    } catch(RuntimeException ex) {
+    } catch (RuntimeException ex) {
       Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(),
           ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist "));
     }
   }
+
   @Test
   public void testMissingKeyStorePasswordFile() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();
@@ -321,11 +335,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     try {
       channel = createFileChannel(overrides);
       Assert.fail();
-    } catch(RuntimeException ex) {
+    } catch (RuntimeException ex) {
       Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(),
           ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist "));
     }
   }
+
   @Test
   public void testBadKeyStorePassword() throws Exception {
     Files.write("invalid", keyStorePasswordFile, Charsets.UTF_8);
@@ -334,11 +349,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
       channel = TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
           dataDir, overrides);
       Assert.fail();
-    } catch(RuntimeException ex) {
+    } catch (RuntimeException ex) {
       Assert.assertEquals("java.io.IOException: Keystore was tampered with, or " +
           "password was incorrect", ex.getMessage());
     }
   }
+
   @Test
   public void testBadKeyAlias() throws Exception {
     Map<String, String> overrides = getOverridesForEncryption();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
index f33cada..8214a05 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
@@ -18,12 +18,10 @@
  */
 package org.apache.flume.channel.file.encryption;
 
-import java.io.File;
-import java.security.Key;
-import java.util.Map;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 import junit.framework.Assert;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Context;
 import org.apache.flume.channel.file.TestUtils;
@@ -31,9 +29,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
+import java.io.File;
+import java.security.Key;
+import java.util.Map;
 
 public class TestJCEFileKeyProvider {
   private CipherProvider.Encryptor encryptor;
@@ -53,62 +51,69 @@ public class TestJCEFileKeyProvider {
     Assert.assertTrue(keyStoreFile.createNewFile());
 
   }
+
   @After
   public void cleanup() {
     FileUtils.deleteQuietly(baseDir);
   }
+
   private void initializeForKey(Key key) {
-    encryptor =
-        new AESCTRNoPaddingProvider.EncryptorBuilder().setKey(key).build();
-    decryptor =
-        new AESCTRNoPaddingProvider.DecryptorBuilder()
-        .setKey(key).setParameters(encryptor.getParameters()).build();
+    encryptor = new AESCTRNoPaddingProvider.EncryptorBuilder()
+                                           .setKey(key)
+                                           .build();
+    decryptor = new AESCTRNoPaddingProvider.DecryptorBuilder()
+                                           .setKey(key)
+                                           .setParameters(encryptor.getParameters())
+                                           .build();
   }
+
   @Test
   public void testWithNewKeyStore() throws Exception {
     createNewKeyStore();
     EncryptionTestUtils.createKeyStore(keyStoreFile, keyStorePasswordFile,
         keyAliasPassword);
-    Context context = new Context(EncryptionTestUtils.
-        configureForKeyStore(keyStoreFile,
-            keyStorePasswordFile, keyAliasPassword));
+    Context context = new Context(
+        EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+                                                 keyStorePasswordFile,
+                                                 keyAliasPassword));
     Context keyProviderContext = new Context(
         context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
-    KeyProvider keyProvider = KeyProviderFactory.
-        getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
+    KeyProvider keyProvider =
+        KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
     testKeyProvider(keyProvider);
   }
+
   @Test
   public void testWithExistingKeyStore() throws Exception {
-    keyAliasPassword.putAll(EncryptionTestUtils.
-        configureTestKeyStore(baseDir, keyStoreFile));
-    Context context = new Context(EncryptionTestUtils.
-        configureForKeyStore(keyStoreFile,
-            keyStorePasswordFile, keyAliasPassword));
+    keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile));
+    Context context = new Context(
+        EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+                                                 keyStorePasswordFile,
+                                                 keyAliasPassword));
     Context keyProviderContext = new Context(
         context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
-    KeyProvider keyProvider = KeyProviderFactory.
-        getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
+    KeyProvider keyProvider =
+        KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
     testKeyProvider(keyProvider);
   }
+
   private void createNewKeyStore() throws Exception {
-    for(int i = 0; i < 10; i++) {
+    for (int i = 0; i < 10; i++) {
       // create some with passwords, some without
-      if(i % 2 == 0) {
+      if (i % 2 == 0) {
         String alias = "test-" + i;
         String password = String.valueOf(i);
-        keyAliasPassword.put(alias,
-            TestUtils.writeStringToFile(baseDir, alias, password));
+        keyAliasPassword.put(alias, TestUtils.writeStringToFile(baseDir, alias, password));
       }
     }
   }
+
   private void testKeyProvider(KeyProvider keyProvider) {
-    for(String alias : keyAliasPassword.keySet()) {
+    for (String alias : keyAliasPassword.keySet()) {
       Key key = keyProvider.getKey(alias);
       initializeForKey(key);
       String expected = "some text here " + alias;
-      byte[] cipherText = encryptor.
-          encrypt(expected.getBytes(Charsets.UTF_8));
+      byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8));
       byte[] clearText = decryptor.decrypt(cipherText);
       Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8));
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
index 85ad7fe..7031eb7 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
@@ -17,6 +17,17 @@
  */
 package org.apache.flume.channel.jdbc;
 
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,17 +43,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public abstract class BaseJdbcChannelProviderTest {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(BaseJdbcChannelProviderTest.class);
@@ -103,7 +103,7 @@ public abstract class BaseJdbcChannelProviderTest {
 
     Set<MockEvent> events = new HashSet<MockEvent>();
     for (int i = 1; i < 12; i++) {
-      events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 1));
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 1));
     }
 
     Iterator<MockEvent> meIt = events.iterator();
@@ -170,7 +170,7 @@ public abstract class BaseJdbcChannelProviderTest {
         new HashMap<String, List<MockEvent>>();
 
     for (int i = 1; i < 121; i++) {
-      MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10);
+      MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61 % i, 10);
       List<MockEvent> meList = eventMap.get(me.getChannel());
       if (meList == null) {
         meList = new ArrayList<MockEvent>();
@@ -227,7 +227,7 @@ public abstract class BaseJdbcChannelProviderTest {
 
     Set<MockEvent> events = new HashSet<MockEvent>();
     for (int i = 1; i < 81; i++) {
-      events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5));
+      events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 5));
     }
 
     Iterator<MockEvent> meIt = events.iterator();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
index 1e412c5..6804a9f 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
@@ -27,8 +27,7 @@ public class MockEvent implements Event {
   private final Map<String, String> headers;
   private final String channel;
 
-  public MockEvent(byte[] payload, Map<String, String> headers, String channel)
-  {
+  public MockEvent(byte[] payload, Map<String, String> headers, String channel) {
     this.payload = payload;
     this.headers = headers;
     this.channel = channel;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
index 10d8b51..e5ee324 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
@@ -17,13 +17,13 @@
  */
 package org.apache.flume.channel.jdbc;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public final class MockEventUtils {
 
   public static final Logger LOGGER =
@@ -70,20 +70,20 @@ public final class MockEventUtils {
    * @param numChannels
    * @return
    */
-  public static MockEvent generateMockEvent(int payloadMargin,
-      int headerNameMargin,	int headerValueMargin, int numHeaders,
-      int numChannels) {
+  public static MockEvent generateMockEvent(int payloadMargin, int headerNameMargin,
+                                            int headerValueMargin, int numHeaders,
+                                            int numChannels) {
 
     int chIndex = 0;
     if (numChannels > 1) {
-      chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+      chIndex = Math.abs(RANDOM.nextInt()) % numChannels;
     }
-    String channel = "test-"+chIndex;
+    String channel = "test-" + chIndex;
 
     StringBuilder sb = new StringBuilder("New Event[payload size:");
 
     int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
-    int plSize = Math.abs(RANDOM.nextInt())%plTh + payloadMargin;
+    int plSize = Math.abs(RANDOM.nextInt()) % plTh + payloadMargin;
     sb.append(plSize).append(", numHeaders:").append(numHeaders);
     sb.append(", channel:").append(channel);
 
@@ -93,8 +93,8 @@ public final class MockEventUtils {
 
     Map<String, String> headers = new HashMap<String, String>();
     for (int i = 0; i < numHeaders; i++) {
-      int nmSize = Math.abs(RANDOM.nextInt())%nmTh + headerNameMargin;
-      int vlSize = Math.abs(RANDOM.nextInt())%vlTh + headerValueMargin;
+      int nmSize = Math.abs(RANDOM.nextInt()) % nmTh + headerNameMargin;
+      int vlSize = Math.abs(RANDOM.nextInt()) % vlTh + headerValueMargin;
 
       String name = generateHeaderString(nmSize);
       String value = generateHeaderString(vlSize);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
index 362bcfa..cad972c 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
@@ -107,7 +107,6 @@ public class TestDerbySchemaHandlerQueries {
       = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, "
           + "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
 
-
   public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL
       = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
 
@@ -119,7 +118,6 @@ public class TestDerbySchemaHandlerQueries {
           + "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE "
           + "FLE_CHANNEL = ?)";
 
-
   public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL
       = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index d01346a..b63ac9b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -130,19 +130,19 @@ public class TestKafkaChannel {
     Properties consumerProps = channel.getConsumerProps();
     Properties producerProps = channel.getProducerProps();
 
-    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl());
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something");
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-
+    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                        testUtil.getKafkaServerUrl());
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+                        "flume-something");
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
+                        "earliest");
   }
 
-
   @Test
   public void testSuccess() throws Exception {
     doTestSuccessRollback(false, false);
   }
 
-
   @Test
   public void testSuccessInterleave() throws Exception {
     doTestSuccessRollback(false, true);
@@ -212,8 +212,10 @@ public class TestKafkaChannel {
 
     KafkaChannel channel = startChannel(false);
 
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(channel.getProducerProps());
-    ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes());
+    KafkaProducer<String, byte[]> producer =
+        new KafkaProducer<String, byte[]>(channel.getProducerProps());
+    ProducerRecord<String, byte[]> data =
+        new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes());
     producer.send(data).get();
     producer.flush();
     producer.close();
@@ -234,7 +236,7 @@ public class TestKafkaChannel {
   }
 
   private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
-    for (int i=0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
 
@@ -255,17 +257,19 @@ public class TestKafkaChannel {
     KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
 
     for (int i = 0; i < 50; i++) {
-      ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes());
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header",
+                                             String.valueOf(i).getBytes());
       producer.send(data).get();
     }
     ExecutorCompletionService<Void> submitterSvc = new
-            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc,
-            50, false, false);
+        ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
     wait(submitterSvc, 5);
     Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+                 events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
       Assert.assertTrue(finals.keySet().contains(i));
@@ -284,7 +288,8 @@ public class TestKafkaChannel {
     KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
 
     for (int i = 0; i < 50; i++) {
-      ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
       producer.send(data).get();
     }
     ExecutorCompletionService<Void> submitterSvc = new
@@ -323,14 +328,14 @@ public class TestKafkaChannel {
       channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
     }
     tx.commit();
-    ExecutorCompletionService<Void> submitterSvc = new
-            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc,
-            50, false, false);
+    ExecutorCompletionService<Void> submitterSvc =
+        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
     wait(submitterSvc, 5);
     Map<Integer, String> finals = new HashMap<Integer, String>();
     for (int i = 0; i < 50; i++) {
-      finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+                 events.get(i).getHeaders().get(KEY_HEADER));
     }
     for (int i = 0; i < 50; i++) {
       Assert.assertTrue(finals.keySet().contains(i));
@@ -403,13 +408,13 @@ public class TestKafkaChannel {
   }
 
   private void writeAndVerify(final boolean testRollbacks,
-                              final KafkaChannel channel, final boolean interleave) throws Exception {
+                              final KafkaChannel channel, final boolean interleave)
+      throws Exception {
 
     final List<List<Event>> events = createBaseList();
 
     ExecutorCompletionService<Void> submitterSvc =
-            new ExecutorCompletionService<Void>(Executors
-                    .newCachedThreadPool());
+        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
 
     putEvents(channel, events, submitterSvc);
 
@@ -418,11 +423,9 @@ public class TestKafkaChannel {
     }
 
     ExecutorCompletionService<Void> submitterSvc2 =
-            new ExecutorCompletionService<Void>(Executors
-                    .newCachedThreadPool());
+        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
 
-    final List<Event> eventsPulled =
-            pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+    final List<Event> eventsPulled = pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
 
     if (!interleave) {
       wait(submitterSvc, 5);
@@ -586,18 +589,18 @@ public class TestKafkaChannel {
     int numPartitions = 5;
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
-            replicationFactor, topicConfig);
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
   }
 
   public static void deleteTopic(String topicName) {
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
     AdminUtils.deleteTopic(zkUtils, topicName);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
index 1e4e819..848636b 100644
--- a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
+++ b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
@@ -24,10 +24,13 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
-
 import java.util.UUID;
 
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.channel.file.FileChannelConfiguration;
@@ -39,7 +42,6 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
-
 public class TestSpillableMemoryChannel {
 
   private SpillableMemoryChannel channel;
@@ -51,14 +53,14 @@ public class TestSpillableMemoryChannel {
     Context context = new Context();
     File checkPointDir = fileChannelDir.newFolder("checkpoint");
     File dataDir = fileChannelDir.newFolder("data");
-    context.put(FileChannelConfiguration.CHECKPOINT_DIR
-            , checkPointDir.getAbsolutePath());
+    context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkPointDir.getAbsolutePath());
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath());
     // Set checkpoint for 5 seconds otherwise test will run out of memory
     context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");
 
-    if (overrides != null)
+    if (overrides != null) {
       context.putAll(overrides);
+    }
 
     Configurables.configure(channel, context);
   }
@@ -81,9 +83,9 @@ public class TestSpillableMemoryChannel {
     startChannel(params);
   }
 
-
   static class NullFound extends RuntimeException {
     public int expectedValue;
+
     public NullFound(int expected) {
       super("Expected " + expected + ",  but null found");
       expectedValue = expected;
@@ -92,9 +94,10 @@ public class TestSpillableMemoryChannel {
 
   static class TooManyNulls extends RuntimeException {
     private int nullsFound;
+
     public TooManyNulls(int count) {
       super("Total nulls found in thread ("
-              + Thread.currentThread().getName() + ") : " + count);
+            + Thread.currentThread().getName() + ") : " + count);
       nullsFound = count;
     }
   }
@@ -102,7 +105,7 @@ public class TestSpillableMemoryChannel {
   @Before
   public void setUp() {
     channel = new SpillableMemoryChannel();
-    channel.setName("spillChannel-" + UUID.randomUUID() );
+    channel.setName("spillChannel-" + UUID.randomUUID());
   }
 
   @After
@@ -117,7 +120,7 @@ public class TestSpillableMemoryChannel {
   }
 
   private static void takeNull(AbstractChannel channel) {
-      channel.take();
+    channel.take();
   }
 
   private static void takeN(int first, int count, AbstractChannel channel) {
@@ -127,7 +130,7 @@ public class TestSpillableMemoryChannel {
       if (e == null) {
         throw new NullFound(i);
       }
-      Event expected = EventBuilder.withBody( String.valueOf(i).getBytes() );
+      Event expected = EventBuilder.withBody(String.valueOf(i).getBytes());
       Assert.assertArrayEquals(e.getBody(), expected.getBody());
     }
   }
@@ -140,16 +143,14 @@ public class TestSpillableMemoryChannel {
       if (e == null) {
         try {
           Thread.sleep(0);
-        } catch (InterruptedException ex)
-        { /* ignore */ }
+        } catch (InterruptedException ex) { /* ignore */ }
         return i;
       }
     }
     return i;
   }
 
-  private static void transactionalPutN(int first, int count,
-                                        AbstractChannel channel) {
+  private static void transactionalPutN(int first, int count, AbstractChannel channel) {
     Transaction tx = channel.getTransaction();
     tx.begin();
     try {
@@ -163,8 +164,7 @@ public class TestSpillableMemoryChannel {
     }
   }
 
-  private static void transactionalTakeN(int first, int count,
-                                         AbstractChannel channel) {
+  private static void transactionalTakeN(int first, int count, AbstractChannel channel) {
     Transaction tx = channel.getTransaction();
     tx.begin();
     try {
@@ -184,14 +184,13 @@ public class TestSpillableMemoryChannel {
     }
   }
 
-  private static int transactionalTakeN_NoCheck(int count
-          , AbstractChannel channel)  {
+  private static int transactionalTakeN_NoCheck(int count, AbstractChannel channel) {
     Transaction tx = channel.getTransaction();
     tx.begin();
     try {
       int eventCount = takeN_NoCheck(count, channel);
       tx.commit();
-      return  eventCount;
+      return eventCount;
     } catch (RuntimeException e) {
       tx.rollback();
       throw e;
@@ -204,8 +203,9 @@ public class TestSpillableMemoryChannel {
     Transaction tx = channel.getTransaction();
     tx.begin();
     try {
-      for (int i = 0; i < count; ++i)
+      for (int i = 0; i < count; ++i) {
         takeNull(channel);
+      }
       tx.commit();
     } catch (AssertionError e) {
       tx.rollback();
@@ -218,68 +218,63 @@ public class TestSpillableMemoryChannel {
     }
   }
 
-  private Thread makePutThread(String threadName
-          , final int first, final int count, final int batchSize
-          , final AbstractChannel channel) {
-    return
-      new Thread(threadName) {
-        public void run() {
-          int maxdepth = 0;
-          StopWatch watch = new StopWatch();
-          for (int i = first; i<first+count; i=i+batchSize) {
-            transactionalPutN(i, batchSize, channel);
+  private Thread makePutThread(String threadName, final int first, final int count,
+                               final int batchSize, final AbstractChannel channel) {
+    return new Thread(threadName) {
+      public void run() {
+        int maxdepth = 0;
+        StopWatch watch = new StopWatch();
+        for (int i = first; i < first + count; i = i + batchSize) {
+          transactionalPutN(i, batchSize, channel);
+        }
+        watch.elapsed();
+      }
+    };
+  }
+
+  private static Thread makeTakeThread(String threadName, final int first, final int count,
+                                       final int batchSize, final AbstractChannel channel) {
+    return new Thread(threadName) {
+      public void run() {
+        StopWatch watch = new StopWatch();
+        for (int i = first; i < first + count; ) {
+          try {
+            transactionalTakeN(i, batchSize, channel);
+            i = i + batchSize;
+          } catch (NullFound e) {
+            i = e.expectedValue;
           }
-          watch.elapsed();
         }
-      };
-  }
-
-  private static Thread makeTakeThread(String threadName, final int first
-        , final int count, final int batchSize, final AbstractChannel channel) {
-    return
-      new Thread(threadName) {
-        public void run() {
-          StopWatch watch = new StopWatch();
-          for (int i = first; i < first+count; ) {
+        watch.elapsed();
+      }
+    };
+  }
+
+  private static Thread makeTakeThread_noCheck(String threadName, final int totalEvents,
+                                               final int batchSize, final AbstractChannel channel) {
+    return new Thread(threadName) {
+      public void run() {
+        int batchSz = batchSize;
+        StopWatch watch = new StopWatch();
+        int i = 0, attempts = 0;
+        while (i < totalEvents) {
+          int remaining = totalEvents - i;
+          batchSz = (remaining > batchSz) ? batchSz : remaining;
+          int takenCount = transactionalTakeN_NoCheck(batchSz, channel);
+          if (takenCount < batchSz) {
             try {
-              transactionalTakeN(i, batchSize, channel);
-              i = i + batchSize;
-            } catch (NullFound e) {
-              i = e.expectedValue;
-            }
+              Thread.sleep(20);
+            } catch (InterruptedException ex) { /* ignore */ }
           }
-          watch.elapsed();
-        }
-      };
-  }
-
-  private static Thread makeTakeThread_noCheck(String threadName
-        , final int totalEvents, final int batchSize, final AbstractChannel channel) {
-    return
-      new Thread(threadName) {
-        public void run() {
-          int batchSz = batchSize;
-          StopWatch watch = new StopWatch();
-          int i = 0, attempts = 0 ;
-          while(i < totalEvents) {
-              int remaining = totalEvents - i;
-              batchSz = (remaining > batchSz) ? batchSz : remaining;
-              int takenCount = transactionalTakeN_NoCheck(batchSz, channel);
-              if(takenCount < batchSz) {
-                try {
-                  Thread.sleep(20);
-                } catch (InterruptedException ex)
-                { /* ignore */ }
-              }
-              i += takenCount;
-              ++attempts;
-              if(attempts  >  totalEvents * 3 ) {
-                throw new TooManyNulls(attempts);
-              }
+          i += takenCount;
+          ++attempts;
+          if (attempts > totalEvents * 3) {
+            throw new TooManyNulls(attempts);
           }
-          watch.elapsed(" items = " + i + ", attempts = " + attempts);
         }
-      };
+        watch.elapsed(" items = " + i + ", attempts = " + attempts);
+      }
+    };
   }
 
   @Test
@@ -292,37 +287,36 @@ public class TestSpillableMemoryChannel {
 
     Transaction tx = channel.getTransaction();
     tx.begin();
-    putN(0,2,channel);
+    putN(0, 2, channel);
     tx.commit();
     tx.close();
 
     tx = channel.getTransaction();
     tx.begin();
-    takeN(0,2,channel);
+    takeN(0, 2, channel);
     tx.commit();
     tx.close();
   }
 
-
   @Test
-  public void testCapacityDisableOverflow()  {
+  public void testCapacityDisableOverflow() {
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "2");
     params.put("overflowCapacity", "0");   // overflow is disabled effectively
-    params.put("overflowTimeout", "0" );
+    params.put("overflowTimeout", "0");
     startChannel(params);
 
-    transactionalPutN(0,2,channel);
+    transactionalPutN(0, 2, channel);
 
     boolean threw = false;
     try {
-      transactionalPutN(2,1,channel);
+      transactionalPutN(2, 1, channel);
     } catch (ChannelException e) {
       threw = true;
     }
     Assert.assertTrue("Expecting ChannelFullException to be thrown", threw);
 
-    transactionalTakeN(0,2, channel);
+    transactionalTakeN(0, 2, channel);
 
     Transaction tx = channel.getTransaction();
     tx.begin();
@@ -332,7 +326,7 @@ public class TestSpillableMemoryChannel {
   }
 
   @Test
-  public void testCapacityWithOverflow()  {
+  public void testCapacityWithOverflow() {
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "2");
     params.put("overflowCapacity", "4");
@@ -346,19 +340,19 @@ public class TestSpillableMemoryChannel {
 
     boolean threw = false;
     try {
-      transactionalPutN(7,2,channel);   // cannot fit in channel
+      transactionalPutN(7, 2, channel);   // cannot fit in channel
     } catch (ChannelFullException e) {
       threw = true;
     }
     Assert.assertTrue("Expecting ChannelFullException to be thrown", threw);
 
-    transactionalTakeN(1,2, channel);
-    transactionalTakeN(3,2, channel);
-    transactionalTakeN(5,2, channel);
+    transactionalTakeN(1, 2, channel);
+    transactionalTakeN(3, 2, channel);
+    transactionalTakeN(5, 2, channel);
   }
 
   @Test
-  public void testRestart()  {
+  public void testRestart() {
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "2");
     params.put("overflowCapacity", "10");
@@ -372,8 +366,7 @@ public class TestSpillableMemoryChannel {
     restartChannel(params);
 
     // from overflow, as in memory stuff should be lost
-    transactionalTakeN(3,2, channel);
-
+    transactionalTakeN(3, 2, channel);
   }
 
   @Test
@@ -382,35 +375,34 @@ public class TestSpillableMemoryChannel {
     params.put("memoryCapacity", "10000000");
     params.put("overflowCapacity", "20000000");
     params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
-    params.put("overflowTimeout", "1" );
+    params.put("overflowTimeout", "1");
 
     startChannel(params);
 
-    transactionalPutN( 1,5,channel);
-    transactionalPutN( 6,5,channel);
-    transactionalPutN(11,5,channel); // these should go to overflow
+    transactionalPutN(1, 5, channel);
+    transactionalPutN(6, 5, channel);
+    transactionalPutN(11, 5, channel); // these should go to overflow
 
-    transactionalTakeN(1,10, channel);
-    transactionalTakeN(11,5, channel);
+    transactionalTakeN(1, 10, channel);
+    transactionalTakeN(11, 5, channel);
   }
 
   @Test
   public void testOverflow() {
-
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "10");
     params.put("overflowCapacity", "20");
     params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
-    params.put("overflowTimeout", "1" );
+    params.put("overflowTimeout", "1");
 
     startChannel(params);
 
-    transactionalPutN( 1,5,channel);
-    transactionalPutN( 6,5,channel);
-    transactionalPutN(11,5,channel); // these should go to overflow
+    transactionalPutN(1, 5, channel);
+    transactionalPutN(6, 5, channel);
+    transactionalPutN(11, 5, channel); // these should go to overflow
 
-    transactionalTakeN(1,10, channel);
-    transactionalTakeN(11,5, channel);
+    transactionalTakeN(1, 10, channel);
+    transactionalTakeN(11, 5, channel);
   }
 
   @Test
@@ -419,29 +411,29 @@ public class TestSpillableMemoryChannel {
     params.put("memoryCapacity", "10");
     params.put("overflowCapacity", "10");
     params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "5");
-    params.put("overflowTimeout", "1" );
+    params.put("overflowTimeout", "1");
 
     startChannel(params);
 
-    transactionalPutN( 1,5,channel);
-    transactionalPutN( 6,5,channel);
-    transactionalPutN(11,5,channel); // into overflow
-    transactionalPutN(16,5,channel); // into overflow
+    transactionalPutN(1, 5, channel);
+    transactionalPutN(6, 5, channel);
+    transactionalPutN(11, 5, channel); // into overflow
+    transactionalPutN(16, 5, channel); // into overflow
 
     transactionalTakeN(1, 1, channel);
-    transactionalTakeN(2, 5,channel);
-    transactionalTakeN(7, 4,channel);
+    transactionalTakeN(2, 5, channel);
+    transactionalTakeN(7, 4, channel);
 
-    transactionalPutN( 20,2,channel);
-    transactionalPutN( 22,3,channel);
+    transactionalPutN(20, 2, channel);
+    transactionalPutN(22, 3, channel);
 
-    transactionalTakeN( 11,3,channel); // from overflow
-    transactionalTakeN( 14,5,channel); // from overflow
-    transactionalTakeN( 19,2,channel); // from overflow
+    transactionalTakeN(11, 3, channel); // from overflow
+    transactionalTakeN(14, 5, channel); // from overflow
+    transactionalTakeN(19, 2, channel); // from overflow
   }
 
   @Test
-  public void testByteCapacity()  {
+  public void testByteCapacity() {
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "1000");
     // configure to hold 8 events of 10 bytes each (plus 20% event header space)
@@ -449,12 +441,12 @@ public class TestSpillableMemoryChannel {
     params.put("avgEventSize", "10");
     params.put("overflowCapacity", "20");
     params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
-    params.put("overflowTimeout", "1" );
+    params.put("overflowTimeout", "1");
     startChannel(params);
 
     transactionalPutN(1, 8, channel);   // this wil max the byteCapacity
     transactionalPutN(9, 10, channel);
-    transactionalPutN(19,10, channel);  // this will fill up the overflow
+    transactionalPutN(19, 10, channel);  // this will fill up the overflow
 
     boolean threw = false;
     try {
@@ -463,7 +455,6 @@ public class TestSpillableMemoryChannel {
       threw = true;
     }
     Assert.assertTrue("byteCapacity did not throw as expected", threw);
-
   }
 
   @Test
@@ -473,7 +464,7 @@ public class TestSpillableMemoryChannel {
     params.put("memoryCapacity", "5");
     params.put("overflowCapacity", "15");
     params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
-    params.put("overflowTimeout", "1" );
+    params.put("overflowTimeout", "1");
     startChannel(params);
 
     transactionalPutN(1, 5, channel);
@@ -493,13 +484,13 @@ public class TestSpillableMemoryChannel {
     transactionalTakeN(6, 5, channel);  // from overflow
 
     transactionalTakeN(11, 5, channel); // from overflow
-    transactionalTakeN(16, 2,channel); // from overflow
+    transactionalTakeN(16, 2, channel); // from overflow
 
     transactionalPutN(21, 5, channel);
 
     tx = channel.getTransaction();
     tx.begin();
-    takeN(18,3, channel);              // from overflow
+    takeN(18, 3, channel);              // from overflow
     takeNull(channel);  // expect null since next event is in primary
     tx.commit();
     tx.close();
@@ -516,9 +507,8 @@ public class TestSpillableMemoryChannel {
     params.put("overflowTimeout", "0");
     startChannel(params);
 
-
     //1 Rollback for Puts
-    transactionalPutN(1,5, channel);
+    transactionalPutN(1, 5, channel);
     Transaction tx = channel.getTransaction();
     tx.begin();
     putN(6, 5, channel);
@@ -530,8 +520,7 @@ public class TestSpillableMemoryChannel {
 
     //2.  verify things back to normal after put rollback
     transactionalPutN(11, 5, channel);
-    transactionalTakeN(11,5,channel);
-
+    transactionalTakeN(11, 5, channel);
 
     //3 Rollback for Takes
     transactionalPutN(16, 5, channel);
@@ -545,13 +534,12 @@ public class TestSpillableMemoryChannel {
     transactionalTakeN_NoCheck(5, channel);
 
     //4.  verify things back to normal after take rollback
-    transactionalPutN(21,5, channel);
-    transactionalTakeN(21,5,channel);
+    transactionalPutN(21, 5, channel);
+    transactionalTakeN(21, 5, channel);
   }
 
-
   @Test
-  public void testReconfigure()  {
+  public void testReconfigure() {
     //1) bring up with small capacity
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "10");
@@ -559,12 +547,12 @@ public class TestSpillableMemoryChannel {
     params.put("overflowTimeout", "0");
     startChannel(params);
 
-    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
-            , channel.getOverflowTimeout() == 0);
-    Assert.assertTrue("memoryCapacity did not reconfigure correctly"
-            , channel.getMemoryCapacity() == 10);
-    Assert.assertTrue("overflowCapacity did not reconfigure correctly"
-            , channel.isOverflowDisabled() );
+    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+                      channel.getOverflowTimeout() == 0);
+    Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+                      channel.getMemoryCapacity() == 10);
+    Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+                      channel.isOverflowDisabled());
 
     transactionalPutN(1, 10, channel);
     boolean threw = false;
@@ -574,8 +562,7 @@ public class TestSpillableMemoryChannel {
       threw = true;
     }
     Assert.assertTrue("Expected the channel to fill up and throw an exception, "
-            + "but it did not throw", threw);
-
+                      + "but it did not throw", threw);
 
     //2) Resize and verify
     params = new HashMap<String, String>();
@@ -583,12 +570,13 @@ public class TestSpillableMemoryChannel {
     params.put("overflowCapacity", "0");
     reconfigureChannel(params);
 
-    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
-        , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout);
-    Assert.assertTrue("memoryCapacity did not reconfigure correctly"
-        , channel.getMemoryCapacity() == 20);
-    Assert.assertTrue("overflowCapacity did not reconfigure correctly"
-        , channel.isOverflowDisabled() );
+    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+                      channel.getOverflowTimeout() ==
+                      SpillableMemoryChannel.defaultOverflowTimeout);
+    Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+                      channel.getMemoryCapacity() == 20);
+    Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+                      channel.isOverflowDisabled());
 
     // pull out the values inserted prior to reconfiguration
     transactionalTakeN(1, 10, channel);
@@ -603,25 +591,25 @@ public class TestSpillableMemoryChannel {
       threw = true;
     }
     Assert.assertTrue("Expected the channel to fill up and throw an exception, "
-            + "but it did not throw", threw);
+                      + "but it did not throw", threw);
 
     transactionalTakeN(11, 10, channel);
     transactionalTakeN(21, 10, channel);
 
-
     // 3) Reconfigure with empty config and verify settings revert to default
     params = new HashMap<String, String>();
     reconfigureChannel(params);
 
-    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
-      , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout);
-    Assert.assertTrue("memoryCapacity did not reconfigure correctly"
-      , channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity);
-    Assert.assertTrue("overflowCapacity did not reconfigure correctly"
-      , channel.getOverflowCapacity() == SpillableMemoryChannel.defaultOverflowCapacity);
-    Assert.assertFalse("overflowCapacity did not reconfigure correctly"
-      , channel.isOverflowDisabled());
-
+    Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+                      channel.getOverflowTimeout() ==
+                      SpillableMemoryChannel.defaultOverflowTimeout);
+    Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+                      channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity);
+    Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+                      channel.getOverflowCapacity() ==
+                      SpillableMemoryChannel.defaultOverflowCapacity);
+    Assert.assertFalse("overflowCapacity did not reconfigure correctly",
+                       channel.isOverflowDisabled());
 
     // 4) Reconfiguring of  overflow
     params = new HashMap<String, String>();
@@ -631,19 +619,18 @@ public class TestSpillableMemoryChannel {
     params.put("overflowTimeout", "1");
     reconfigureChannel(params);
 
-    transactionalPutN( 1,5, channel);
-    transactionalPutN( 6,5, channel);
-    transactionalPutN(11,5, channel);
-    transactionalPutN(16,5, channel);
-    threw=false;
+    transactionalPutN(1, 5, channel);
+    transactionalPutN(6, 5, channel);
+    transactionalPutN(11, 5, channel);
+    transactionalPutN(16, 5, channel);
+    threw = false;
     try {
       // should error out as both primary & overflow are full
-      transactionalPutN(21,5, channel);
+      transactionalPutN(21, 5, channel);
     } catch (ChannelException e) {
       threw = true;
     }
-    Assert.assertTrue("Expected the last insertion to fail, but it didn't."
-            , threw);
+    Assert.assertTrue("Expected the last insertion to fail, but it didn't.", threw);
 
     // reconfig the overflow
     params = new HashMap<String, String>();
@@ -654,10 +641,10 @@ public class TestSpillableMemoryChannel {
     reconfigureChannel(params);
 
     // should succeed now as we have made room in the overflow
-    transactionalPutN(21,5, channel);
+    transactionalPutN(21, 5, channel);
 
-    transactionalTakeN(1,10, channel);
-    transactionalTakeN(11,5, channel);
+    transactionalTakeN(1, 10, channel);
+    transactionalTakeN(11, 5, channel);
     transactionalTakeN(16, 5, channel);
     transactionalTakeN(21, 5, channel);
   }
@@ -666,13 +653,13 @@ public class TestSpillableMemoryChannel {
   public void testParallelSingleSourceAndSink() throws InterruptedException {
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "1000020");
-    params.put("overflowCapacity",   "0");
+    params.put("overflowCapacity", "0");
     params.put("overflowTimeout", "3");
     startChannel(params);
 
     // run source and sink concurrently
     Thread sourceThd = makePutThread("src", 1, 500000, 100, channel);
-    Thread sinkThd = makeTakeThread("sink",  1, 500000, 100, channel);
+    Thread sinkThd = makeTakeThread("sink", 1, 500000, 100, channel);
 
     StopWatch watch = new StopWatch();
 
@@ -683,15 +670,15 @@ public class TestSpillableMemoryChannel {
     sinkThd.join();
 
     watch.elapsed();
-    System.out.println("Max Queue size " + channel.getMaxMemQueueSize() );
+    System.out.println("Max Queue size " + channel.getMaxMemQueueSize());
   }
 
   @Test
   public void testCounters() throws InterruptedException {
     Map<String, String> params = new HashMap<String, String>();
-    params.put("memoryCapacity",  "5000");
-    params.put("overflowCapacity","5000");
-    params.put("transactionCapacity","5000");
+    params.put("memoryCapacity", "5000");
+    params.put("overflowCapacity", "5000");
+    params.put("transactionCapacity", "5000");
     params.put("overflowTimeout", "0");
     startChannel(params);
 
@@ -706,7 +693,7 @@ public class TestSpillableMemoryChannel {
     Assert.assertEquals(5000, channel.channelCounter.getEventPutSuccessCount());
 
     //2. empty mem queue
-    Thread sinkThd =  makeTakeThread("sink",  1, 5000, 1000, channel);
+    Thread sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
     sinkThd.start();
     sinkThd.join();
     Assert.assertEquals(0, channel.getTotalStored());
@@ -714,7 +701,6 @@ public class TestSpillableMemoryChannel {
     Assert.assertEquals(5000, channel.channelCounter.getEventTakeAttemptCount());
     Assert.assertEquals(5000, channel.channelCounter.getEventTakeSuccessCount());
 
-
     //3. fill up mem & overflow
     sourceThd = makePutThread("src", 1, 10000, 1000, channel);
     sourceThd.start();
@@ -724,9 +710,8 @@ public class TestSpillableMemoryChannel {
     Assert.assertEquals(15000, channel.channelCounter.getEventPutAttemptCount());
     Assert.assertEquals(15000, channel.channelCounter.getEventPutSuccessCount());
 
-
     //4. empty memory
-    sinkThd = makeTakeThread("sink",  1, 5000, 1000, channel);
+    sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
     sinkThd.start();
     sinkThd.join();
     Assert.assertEquals(5000, channel.getTotalStored());
@@ -745,12 +730,10 @@ public class TestSpillableMemoryChannel {
     Assert.assertEquals(15000, channel.channelCounter.getEventTakeAttemptCount());
     Assert.assertEquals(15000, channel.channelCounter.getEventTakeSuccessCount());
 
-
-
     //6. now do it concurrently
     sourceThd = makePutThread("src1", 1, 5000, 1000, channel);
     Thread sourceThd2 = makePutThread("src2", 1, 5000, 500, channel);
-    sinkThd =  makeTakeThread_noCheck("sink1", 5000, 1000, channel);
+    sinkThd = makeTakeThread_noCheck("sink1", 5000, 1000, channel);
     sourceThd.start();
     sourceThd2.start();
     sinkThd.start();
@@ -759,8 +742,8 @@ public class TestSpillableMemoryChannel {
     sinkThd.join();
     Assert.assertEquals(5000, channel.getTotalStored());
     Assert.assertEquals(5000, channel.channelCounter.getChannelSize());
-    Thread sinkThd2 =  makeTakeThread_noCheck("sink2", 2500, 500, channel);
-    Thread sinkThd3 =  makeTakeThread_noCheck("sink3", 2500, 1000, channel);
+    Thread sinkThd2 = makeTakeThread_noCheck("sink2", 2500, 500, channel);
+    Thread sinkThd3 = makeTakeThread_noCheck("sink3", 2500, 1000, channel);
     sinkThd2.start();
     sinkThd3.start();
     sinkThd2.join();
@@ -769,30 +752,26 @@ public class TestSpillableMemoryChannel {
     Assert.assertEquals(0, channel.channelCounter.getChannelSize());
     Assert.assertEquals(25000, channel.channelCounter.getEventTakeSuccessCount());
     Assert.assertEquals(25000, channel.channelCounter.getEventPutSuccessCount());
-    Assert.assertTrue("TakeAttempt channel counter value larger than expected" ,
-            25000 <= channel.channelCounter.getEventTakeAttemptCount());
+    Assert.assertTrue("TakeAttempt channel counter value larger than expected",
+                      25000 <= channel.channelCounter.getEventTakeAttemptCount());
     Assert.assertTrue("PutAttempt channel counter value larger than expected",
-            25000 <= channel.channelCounter.getEventPutAttemptCount());
+                      25000 <= channel.channelCounter.getEventPutAttemptCount());
   }
 
-  public ArrayList<Thread> createSourceThreads(int count, int totalEvents
-          , int batchSize) {
+  public ArrayList<Thread> createSourceThreads(int count, int totalEvents, int batchSize) {
     ArrayList<Thread> sourceThds = new ArrayList<Thread>();
 
     for (int i = 0; i < count; ++i) {
-      sourceThds.add(  makePutThread("src" + i, 1, totalEvents/count
-              , batchSize, channel) );
+      sourceThds.add(makePutThread("src" + i, 1, totalEvents / count, batchSize, channel));
     }
     return sourceThds;
   }
 
-  public ArrayList<Thread> createSinkThreads(int count, int totalEvents
-          , int batchSize) {
+  public ArrayList<Thread> createSinkThreads(int count, int totalEvents, int batchSize) {
     ArrayList<Thread> sinkThreads = new ArrayList<Thread>(count);
 
     for (int i = 0; i < count; ++i) {
-      sinkThreads.add( makeTakeThread_noCheck("sink"+i, totalEvents/count
-              , batchSize, channel) );
+      sinkThreads.add(makeTakeThread_noCheck("sink" + i, totalEvents / count, batchSize, channel));
     }
     return sinkThreads;
   }
@@ -803,13 +782,12 @@ public class TestSpillableMemoryChannel {
     }
   }
 
-  public void joinThreads(ArrayList<Thread> threads)
-          throws InterruptedException {
+  public void joinThreads(ArrayList<Thread> threads) throws InterruptedException {
     for (Thread thread : threads) {
       try {
         thread.join();
       } catch (InterruptedException e) {
-        System.out.println("Interrupted while waiting on " + thread.getName() );
+        System.out.println("Interrupted while waiting on " + thread.getName());
         throw e;
       }
     }
@@ -824,15 +802,13 @@ public class TestSpillableMemoryChannel {
 
     Map<String, String> params = new HashMap<String, String>();
     params.put("memoryCapacity", "0");
-    params.put("overflowCapacity",  "500020");
+    params.put("overflowCapacity", "500020");
     params.put("overflowTimeout", "3");
     startChannel(params);
 
     ArrayList<Thread> sinks = createSinkThreads(sinkCount, eventCount, batchSize);
 
-    ArrayList<Thread> sources = createSourceThreads(sourceCount
-            , eventCount, batchSize);
-
+    ArrayList<Thread> sources = createSourceThreads(sourceCount, eventCount, batchSize);
 
     StopWatch watch = new StopWatch();
     startThreads(sinks);
@@ -845,7 +821,7 @@ public class TestSpillableMemoryChannel {
 
     System.out.println("Total puts " + channel.drainOrder.totalPuts);
 
-    System.out.println("Max Queue size " + channel.getMaxMemQueueSize() );
+    System.out.println("Max Queue size " + channel.getMaxMemQueueSize());
     System.out.println(channel.memQueue.size());
 
     System.out.println("done");
@@ -872,10 +848,10 @@ public class TestSpillableMemoryChannel {
 
       if (elapsed < 10000) {
         System.out.println(Thread.currentThread().getName()
-                +  " : [ " + elapsed + " ms ].        " + suffix);
+                           + " : [ " + elapsed + " ms ].        " + suffix);
       } else {
         System.out.println(Thread.currentThread().getName()
-                +  " : [ " + elapsed / 1000 + " sec ].       " + suffix);
+                           + " : [ " + elapsed / 1000 + " sec ].       " + suffix);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
index 267ac1d..53795fb 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
@@ -65,7 +65,7 @@ public class TestLoadBalancingLog4jAppender {
   private boolean slowDown = false;
 
   @Before
-  public void initiate() throws InterruptedException{
+  public void initiate() throws InterruptedException {
     ch = new MemoryChannel();
     configureChannel();
 
@@ -164,9 +164,9 @@ public class TestLoadBalancingLog4jAppender {
   @Test
   public void testRandomBackoffUnsafeMode() throws Exception {
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
-      .getClassLoader()
-      .getResource("flume-loadbalancing-backoff-log4jtest.properties")
-      .getFile());
+        .getClassLoader()
+        .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+        .getFile());
     startSources(TESTFILE, true, new int[]{25430, 25431, 25432});
 
     sources.get(0).setFail();
@@ -179,9 +179,9 @@ public class TestLoadBalancingLog4jAppender {
   @Test (expected = EventDeliveryException.class)
   public void testTimeout() throws Throwable {
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
-      .getClassLoader()
-      .getResource("flume-loadbalancinglog4jtest.properties")
-      .getFile());
+        .getClassLoader()
+        .getResource("flume-loadbalancinglog4jtest.properties")
+        .getFile());
 
     ch = new TestLog4jAppender.SlowMemoryChannel(2000);
     configureChannel();
@@ -200,9 +200,9 @@ public class TestLoadBalancingLog4jAppender {
   @Test(expected = EventDeliveryException.class)
   public void testRandomBackoffNotUnsafeMode() throws Throwable {
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
-      .getClassLoader()
-      .getResource("flume-loadbalancing-backoff-log4jtest.properties")
-      .getFile());
+        .getClassLoader()
+        .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+        .getFile());
     startSources(TESTFILE, false, new int[]{25430, 25431, 25432});
 
     sources.get(0).setFail();
@@ -224,17 +224,17 @@ public class TestLoadBalancingLog4jAppender {
   }
 
   private void sendAndAssertFail() throws IOException {
-      int level = 20000;
-      String msg = "This is log message number" + String.valueOf(level);
-      fixture.log(Level.toLevel(level), msg);
+    int level = 20000;
+    String msg = "This is log message number" + String.valueOf(level);
+    fixture.log(Level.toLevel(level), msg);
 
-      Transaction transaction = ch.getTransaction();
-      transaction.begin();
-      Event event = ch.take();
-      Assert.assertNull(event);
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
+    Event event = ch.take();
+    Assert.assertNull(event);
 
-      transaction.commit();
-      transaction.close();
+    transaction.commit();
+    transaction.close();
 
   }
 
@@ -271,8 +271,7 @@ public class TestLoadBalancingLog4jAppender {
   }
 
   private void startSources(File log4jProps, boolean unsafeMode, int... ports)
-    throws
-    IOException {
+      throws IOException {
     for (int port : ports) {
       CountingAvroSource source = new CountingAvroSource(port);
       Context context = new Context();
@@ -297,8 +296,8 @@ public class TestLoadBalancingLog4jAppender {
     Properties props = new Properties();
     props.load(reader);
     props.setProperty("log4j.appender.out2.UnsafeMode",
-      String.valueOf(unsafeMode));
-    if(slowDown) {
+        String.valueOf(unsafeMode));
+    if (slowDown) {
       props.setProperty("log4j.appender.out2.Timeout", String.valueOf(1000));
     }
     PropertyConfigurator.configure(props);
@@ -308,13 +307,13 @@ public class TestLoadBalancingLog4jAppender {
   static class CountingAvroSource extends AvroSource {
     AtomicInteger appendCount = new AtomicInteger();
     volatile boolean isFail = false;
-	private final int port2;
+    private final int port2;
 
     public CountingAvroSource(int port) {
-		port2 = port;
+      port2 = port;
     }
 
-	public void setOk() {
+    public void setOk() {
       this.isFail = false;
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
index 1b840f3..c087b67 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
@@ -21,7 +21,10 @@ package org.apache.flume.clients.log4jappender;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
@@ -46,13 +49,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestLog4jAppender{
+public class TestLog4jAppender {
   private AvroSource source;
   private Channel ch;
   private Properties props;
 
   @Before
-  public void initiate() throws Exception{
+  public void initiate() throws Exception {
     int port = 25430;
     source = new AvroSource();
     ch = new MemoryChannel();
@@ -88,13 +91,13 @@ public class TestLog4jAppender{
     configureSource();
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
-    for(int count = 0; count <= 1000; count++){
+    for (int count = 0; count <= 1000; count++) {
       /*
        * Log4j internally defines levels as multiples of 10000. So if we
        * create levels directly using count, the level will be set as the
        * default.
        */
-      int level = ((count % 5)+1)*10000;
+      int level = ((count % 5) + 1) * 10000;
       String msg = "This is log message number" + String.valueOf(count);
 
       logger.log(Level.toLevel(level), msg);
@@ -146,11 +149,11 @@ public class TestLog4jAppender{
   }
 
   private void sendAndAssertFail(Logger logger) throws Throwable {
-      /*
-       * Log4j internally defines levels as multiples of 10000. So if we
-       * create levels directly using count, the level will be set as the
-       * default.
-       */
+    /*
+     * Log4j internally defines levels as multiples of 10000. So if we
+     * create levels directly using count, the level will be set as the
+     * default.
+     */
     int level = 20000;
     try {
       logger.log(Level.toLevel(level), "Test Msg");
@@ -177,13 +180,13 @@ public class TestLog4jAppender{
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
     Thread.currentThread().setName("Log4jAppenderTest");
-    for(int count = 0; count <= 100; count++){
+    for (int count = 0; count <= 100; count++) {
       /*
        * Log4j internally defines levels as multiples of 10000. So if we
        * create levels directly using count, the level will be set as the
        * default.
        */
-      int level = ((count % 5)+1)*10000;
+      int level = ((count % 5) + 1) * 10000;
       String msg = "This is log message number" + String.valueOf(count);
 
       logger.log(Level.toLevel(level), msg);
@@ -230,7 +233,7 @@ public class TestLog4jAppender{
     props.put("log4j.appender.out2.Timeout", "1000");
     props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
     props.put("log4j.appender.out2.layout.ConversionPattern",
-      "%-5p [%t]: %m%n");
+        "%-5p [%t]: %m%n");
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
     Thread.currentThread().setName("Log4jAppenderTest");
@@ -251,13 +254,12 @@ public class TestLog4jAppender{
 
 
   @After
-  public void cleanUp(){
+  public void cleanUp() {
     source.stop();
     ch.stop();
     props.clear();
   }
 
-
   static class SlowMemoryChannel extends MemoryChannel {
     private final int slowTime;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
index 5899c62..0607e3a 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
@@ -125,8 +125,7 @@ public class TestLog4jAppenderWithAvro {
     Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
 
     Assert.assertEquals("Schema URL should be set",
-        "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString
-        ()));
+        "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString()));
     Assert.assertNull("Schema string should not be set",
         hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
 
@@ -174,7 +173,7 @@ public class TestLog4jAppenderWithAvro {
   }
 
   @After
-  public void cleanUp(){
+  public void cleanUp() {
     source.stop();
     ch.stop();
     props.clear();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
index 59a804c..7600856 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
@@ -40,6 +40,7 @@ import com.google.common.base.Preconditions;
 public abstract class AbstractBasicChannelSemanticsTest {
 
   protected static List<Event> events;
+
   static {
     Event[] array = new Event[7];
     for (int i = 0; i < array.length; ++i) {
@@ -61,7 +62,7 @@ public abstract class AbstractBasicChannelSemanticsTest {
       THROW_RUNTIME,
       THROW_CHANNEL,
       SLEEP
-    };
+    }
 
     private Mode mode = Mode.NORMAL;
     private boolean lastTransactionCommitted = false;
@@ -158,11 +159,11 @@ public abstract class AbstractBasicChannelSemanticsTest {
 
   protected static class TestError extends Error {
     static final long serialVersionUID = -1;
-  };
+  }
 
   protected static class TestRuntimeException extends RuntimeException {
     static final long serialVersionUID = -1;
-  };
+  }
 
   protected void testException(Class<? extends Throwable> exceptionClass,
       Runnable test) {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index b37b823..93bc0cf 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -20,16 +20,22 @@ package org.apache.flume.channel;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestChannelProcessor {
 
@@ -88,9 +94,9 @@ public class TestChannelProcessor {
   public void testRequiredAndOptionalChannels() {
     Context context = new Context();
     ArrayList<Channel> channels = new ArrayList<Channel>();
-    for(int i = 0; i < 4; i++) {
+    for (int i = 0; i < 4; i++) {
       Channel ch = new MemoryChannel();
-      ch.setName("ch"+i);
+      ch.setName("ch" + i);
       Configurables.configure(ch, context);
       channels.add(ch);
     }
@@ -114,7 +120,7 @@ public class TestChannelProcessor {
     } catch (InterruptedException e) {
     }
 
-    for(Channel channel : channels) {
+    for (Channel channel : channels) {
       Transaction transaction = channel.getTransaction();
       transaction.begin();
       Event event_ch = channel.take();
@@ -124,18 +130,18 @@ public class TestChannelProcessor {
     }
 
     List<Event> events = Lists.newArrayList();
-    for(int i = 0; i < 100; i ++) {
-      events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8));
+    for (int i = 0; i < 100; i++) {
+      events.add(EventBuilder.withBody("event " + i, Charsets.UTF_8));
     }
     processor.processEventBatch(events);
     try {
       Thread.sleep(3000);
     } catch (InterruptedException e) {
     }
-    for(Channel channel : channels) {
+    for (Channel channel : channels) {
       Transaction transaction = channel.getTransaction();
       transaction.begin();
-      for(int i = 0; i < 100; i ++) {
+      for (int i = 0; i < 100; i++) {
         Event event_ch = channel.take();
         Assert.assertNotNull(event_ch);
       }