You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/08 22:50:59 UTC
[7/9] flume git commit: FLUME-2941. Integrate checkstyle for test
classes
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
index 6ca3246..f290035 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java
@@ -18,18 +18,6 @@
*/
package org.apache.flume.channel.file.encryption;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.security.Key;
-import java.security.KeyStore;
-import java.util.List;
-import java.util.Map;
-
-import javax.crypto.KeyGenerator;
-
-import org.apache.flume.channel.file.TestUtils;
-
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
@@ -37,6 +25,16 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.io.Resources;
+import org.apache.flume.channel.file.TestUtils;
+
+import javax.crypto.KeyGenerator;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.Key;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.Map;
public class EncryptionTestUtils {
@@ -50,33 +48,32 @@ public class EncryptionTestUtils {
throw Throwables.propagate(e);
}
}
- public static void createKeyStore(File keyStoreFile,
- File keyStorePasswordFile, Map<String, File> keyAliasPassword)
- throws Exception {
+
+ public static void createKeyStore(File keyStoreFile, File keyStorePasswordFile,
+ Map<String, File> keyAliasPassword) throws Exception {
KeyStore ks = KeyStore.getInstance("jceks");
ks.load(null);
List<String> keysWithSeperatePasswords = Lists.newArrayList();
- for(String alias : keyAliasPassword.keySet()) {
+ for (String alias : keyAliasPassword.keySet()) {
Key key = newKey();
char[] password = null;
File passwordFile = keyAliasPassword.get(alias);
- if(passwordFile == null) {
- password = Files.toString(keyStorePasswordFile, Charsets.UTF_8)
- .toCharArray();
+ if (passwordFile == null) {
+ password = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
} else {
keysWithSeperatePasswords.add(alias);
password = Files.toString(passwordFile, Charsets.UTF_8).toCharArray();
}
ks.setKeyEntry(alias, key, password, null);
}
- char[] keyStorePassword = Files.
- toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
+ char[] keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray();
FileOutputStream outputStream = new FileOutputStream(keyStoreFile);
ks.store(outputStream, keyStorePassword);
outputStream.close();
}
- public static Map<String, File> configureTestKeyStore(File baseDir,
- File keyStoreFile) throws IOException {
+
+ public static Map<String, File> configureTestKeyStore(File baseDir, File keyStoreFile)
+ throws IOException {
Map<String, File> result = Maps.newHashMap();
if (System.getProperty("java.vendor").contains("IBM")) {
@@ -86,50 +83,52 @@ public class EncryptionTestUtils {
Resources.copy(Resources.getResource("sun-test.keystore"),
new FileOutputStream(keyStoreFile));
}
- /*
- Commands below:
- keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
- -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \
- -storetype jceks -storepass keyStorePassword
- keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
- -keystore src/test/resources/test.keystore -storetype jceks \
- -storepass keyStorePassword
+
+ /* Commands below:
+ * keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
+ * -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \
+ * -storetype jceks -storepass keyStorePassword
+ * keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
+ * -keystore src/test/resources/test.keystore -storetype jceks \
+ * -storepass keyStorePassword
*/
-// key-0 has own password, key-1 used key store password
- result.put("key-0",
- TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword"));
+ // key-0 has own password, key-1 used key store password
+ result.put("key-0", TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword"));
result.put("key-1", null);
return result;
}
- public static Map<String,String> configureForKeyStore(File keyStoreFile,
- File keyStorePasswordFile, Map<String, File> keyAliasPassword)
- throws Exception {
+
+ public static Map<String, String> configureForKeyStore(File keyStoreFile,
+ File keyStorePasswordFile,
+ Map<String, File> keyAliasPassword)
+ throws Exception {
Map<String, String> context = Maps.newHashMap();
List<String> keys = Lists.newArrayList();
Joiner joiner = Joiner.on(".");
- for(String alias : keyAliasPassword.keySet()) {
+ for (String alias : keyAliasPassword.keySet()) {
File passwordFile = keyAliasPassword.get(alias);
- if(passwordFile == null) {
+ if (passwordFile == null) {
keys.add(alias);
} else {
String propertyName = joiner.join(EncryptionConfiguration.KEY_PROVIDER,
- EncryptionConfiguration.JCE_FILE_KEYS, alias,
- EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
+ EncryptionConfiguration.JCE_FILE_KEYS,
+ alias,
+ EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE);
keys.add(alias);
context.put(propertyName, passwordFile.getAbsolutePath());
}
}
context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
- EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE),
- keyStoreFile.getAbsolutePath());
- if(keyStorePasswordFile != null) {
+ EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE),
+ keyStoreFile.getAbsolutePath());
+ if (keyStorePasswordFile != null) {
context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
- EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE),
- keyStorePasswordFile.getAbsolutePath());
+ EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE),
+ keyStorePasswordFile.getAbsolutePath());
}
context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER,
- EncryptionConfiguration.JCE_FILE_KEYS),
- Joiner.on(" ").join(keys));
+ EncryptionConfiguration.JCE_FILE_KEYS),
+ Joiner.on(" ").join(keys));
return context;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
index a7c7cb2..d483fcc 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java
@@ -35,13 +35,13 @@ public class TestAESCTRNoPaddingProvider {
public void setup() throws Exception {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
key = keyGen.generateKey();
- encryptor = CipherProviderFactory.
- getEncrypter(CipherProviderType.AESCTRNOPADDING.name(), key);
- decryptor = CipherProviderFactory.
- getDecrypter(CipherProviderType.AESCTRNOPADDING.name(), key,
- encryptor.getParameters());
+ encryptor = CipherProviderFactory.getEncrypter(
+ CipherProviderType.AESCTRNOPADDING.name(), key);
+ decryptor = CipherProviderFactory.getDecrypter(
+ CipherProviderType.AESCTRNOPADDING.name(), key, encryptor.getParameters());
cipherProviderTestSuite = new CipherProviderTestSuite(encryptor, decryptor);
}
+
@Test
public void test() throws Exception {
cipherProviderTestSuite.test();
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
index d4537a8..4e5ab6f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
@@ -18,18 +18,10 @@
*/
package org.apache.flume.channel.file.encryption;
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
import org.apache.flume.ChannelException;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.file.FileChannelConfiguration;
@@ -42,10 +34,21 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
public class TestFileChannelEncryption extends TestFileChannelBase {
protected static final Logger LOGGER =
@@ -62,36 +65,39 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
keyStoreFile = new File(baseDir, "keyStoreFile");
Assert.assertTrue(keyStoreFile.createNewFile());
keyAliasPassword = Maps.newHashMap();
- keyAliasPassword.putAll(EncryptionTestUtils.
- configureTestKeyStore(baseDir, keyStoreFile));
+ keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile));
}
+
@After
public void teardown() {
super.teardown();
}
+
private Map<String, String> getOverrides() throws Exception {
Map<String, String> overrides = Maps.newHashMap();
overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(100));
- overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
- String.valueOf(100));
+ overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, String.valueOf(100));
return overrides;
}
+
private Map<String, String> getOverridesForEncryption() throws Exception {
Map<String, String> overrides = getOverrides();
- Map<String, String> encryptionProps = EncryptionTestUtils.
- configureForKeyStore(keyStoreFile,
- keyStorePasswordFile, keyAliasPassword);
+ Map<String, String> encryptionProps =
+ EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+ keyStorePasswordFile,
+ keyAliasPassword);
encryptionProps.put(EncryptionConfiguration.KEY_PROVIDER,
- KeyProviderType.JCEKSFILE.name());
+ KeyProviderType.JCEKSFILE.name());
encryptionProps.put(EncryptionConfiguration.CIPHER_PROVIDER,
- CipherProviderType.AESCTRNOPADDING.name());
+ CipherProviderType.AESCTRNOPADDING.name());
encryptionProps.put(EncryptionConfiguration.ACTIVE_KEY, "key-1");
- for(String key : encryptionProps.keySet()) {
+ for (String key : encryptionProps.keySet()) {
overrides.put(EncryptionConfiguration.ENCRYPTION_PREFIX + "." + key,
- encryptionProps.get(key));
+ encryptionProps.get(key));
}
return overrides;
}
+
/**
* Test fails without FLUME-1565
*/
@@ -222,15 +228,16 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
channel = createFileChannel(noEncryptionOverrides);
channel.start();
- if(channel.isOpen()) {
+ if (channel.isOpen()) {
try {
takeEvents(channel, 1, 1);
Assert.fail("Channel was opened and take did not throw exception");
- } catch(ChannelException ex) {
+ } catch (ChannelException ex) {
// expected
}
}
}
+
@Test
public void testUnencyrptedAndEncryptedLogs() throws Exception {
Map<String, String> noEncryptionOverrides = getOverrides();
@@ -252,41 +259,46 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
@Test
public void testBadKeyProviderInvalidValue() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
- EncryptionConfiguration.KEY_PROVIDER), "invalid");
+ EncryptionConfiguration.KEY_PROVIDER),
+ "invalid");
try {
channel = createFileChannel(overrides);
Assert.fail();
- } catch(FlumeException ex) {
- Assert.assertEquals("java.lang.ClassNotFoundException: invalid",
- ex.getMessage());
+ } catch (FlumeException ex) {
+ Assert.assertEquals("java.lang.ClassNotFoundException: invalid", ex.getMessage());
}
}
+
@Test
public void testBadKeyProviderInvalidClass() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
- EncryptionConfiguration.KEY_PROVIDER), String.class.getName());
+ EncryptionConfiguration.KEY_PROVIDER),
+ String.class.getName());
try {
channel = createFileChannel(overrides);
Assert.fail();
- } catch(FlumeException ex) {
- Assert.assertEquals("Unable to instantiate Builder from java.lang.String",
- ex.getMessage());
+ } catch (FlumeException ex) {
+ Assert.assertEquals("Unable to instantiate Builder from java.lang.String", ex.getMessage());
}
}
+
@Test
public void testBadCipherProviderInvalidValue() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX,
- EncryptionConfiguration.CIPHER_PROVIDER), "invalid");
+ EncryptionConfiguration.CIPHER_PROVIDER),
+ "invalid");
channel = createFileChannel(overrides);
channel.start();
Assert.assertFalse(channel.isOpen());
}
+
@Test
public void testBadCipherProviderInvalidClass() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
@@ -296,6 +308,7 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
channel.start();
Assert.assertFalse(channel.isOpen());
}
+
@Test
public void testMissingKeyStoreFile() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
@@ -306,11 +319,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
try {
channel = createFileChannel(overrides);
Assert.fail();
- } catch(RuntimeException ex) {
+ } catch (RuntimeException ex) {
Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(),
ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist "));
}
}
+
@Test
public void testMissingKeyStorePasswordFile() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
@@ -321,11 +335,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
try {
channel = createFileChannel(overrides);
Assert.fail();
- } catch(RuntimeException ex) {
+ } catch (RuntimeException ex) {
Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(),
ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist "));
}
}
+
@Test
public void testBadKeyStorePassword() throws Exception {
Files.write("invalid", keyStorePasswordFile, Charsets.UTF_8);
@@ -334,11 +349,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
channel = TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
dataDir, overrides);
Assert.fail();
- } catch(RuntimeException ex) {
+ } catch (RuntimeException ex) {
Assert.assertEquals("java.io.IOException: Keystore was tampered with, or " +
"password was incorrect", ex.getMessage());
}
}
+
@Test
public void testBadKeyAlias() throws Exception {
Map<String, String> overrides = getOverridesForEncryption();
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
index f33cada..8214a05 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java
@@ -18,12 +18,10 @@
*/
package org.apache.flume.channel.file.encryption;
-import java.io.File;
-import java.security.Key;
-import java.util.Map;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
import junit.framework.Assert;
-
import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.channel.file.TestUtils;
@@ -31,9 +29,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
+import java.io.File;
+import java.security.Key;
+import java.util.Map;
public class TestJCEFileKeyProvider {
private CipherProvider.Encryptor encryptor;
@@ -53,62 +51,69 @@ public class TestJCEFileKeyProvider {
Assert.assertTrue(keyStoreFile.createNewFile());
}
+
@After
public void cleanup() {
FileUtils.deleteQuietly(baseDir);
}
+
private void initializeForKey(Key key) {
- encryptor =
- new AESCTRNoPaddingProvider.EncryptorBuilder().setKey(key).build();
- decryptor =
- new AESCTRNoPaddingProvider.DecryptorBuilder()
- .setKey(key).setParameters(encryptor.getParameters()).build();
+ encryptor = new AESCTRNoPaddingProvider.EncryptorBuilder()
+ .setKey(key)
+ .build();
+ decryptor = new AESCTRNoPaddingProvider.DecryptorBuilder()
+ .setKey(key)
+ .setParameters(encryptor.getParameters())
+ .build();
}
+
@Test
public void testWithNewKeyStore() throws Exception {
createNewKeyStore();
EncryptionTestUtils.createKeyStore(keyStoreFile, keyStorePasswordFile,
keyAliasPassword);
- Context context = new Context(EncryptionTestUtils.
- configureForKeyStore(keyStoreFile,
- keyStorePasswordFile, keyAliasPassword));
+ Context context = new Context(
+ EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+ keyStorePasswordFile,
+ keyAliasPassword));
Context keyProviderContext = new Context(
context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
- KeyProvider keyProvider = KeyProviderFactory.
- getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
+ KeyProvider keyProvider =
+ KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
testKeyProvider(keyProvider);
}
+
@Test
public void testWithExistingKeyStore() throws Exception {
- keyAliasPassword.putAll(EncryptionTestUtils.
- configureTestKeyStore(baseDir, keyStoreFile));
- Context context = new Context(EncryptionTestUtils.
- configureForKeyStore(keyStoreFile,
- keyStorePasswordFile, keyAliasPassword));
+ keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile));
+ Context context = new Context(
+ EncryptionTestUtils.configureForKeyStore(keyStoreFile,
+ keyStorePasswordFile,
+ keyAliasPassword));
Context keyProviderContext = new Context(
context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
- KeyProvider keyProvider = KeyProviderFactory.
- getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
+ KeyProvider keyProvider =
+ KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext);
testKeyProvider(keyProvider);
}
+
private void createNewKeyStore() throws Exception {
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
// create some with passwords, some without
- if(i % 2 == 0) {
+ if (i % 2 == 0) {
String alias = "test-" + i;
String password = String.valueOf(i);
- keyAliasPassword.put(alias,
- TestUtils.writeStringToFile(baseDir, alias, password));
+ keyAliasPassword.put(alias, TestUtils.writeStringToFile(baseDir, alias, password));
}
}
}
+
private void testKeyProvider(KeyProvider keyProvider) {
- for(String alias : keyAliasPassword.keySet()) {
+ for (String alias : keyAliasPassword.keySet()) {
Key key = keyProvider.getKey(alias);
initializeForKey(key);
String expected = "some text here " + alias;
- byte[] cipherText = encryptor.
- encrypt(expected.getBytes(Charsets.UTF_8));
+ byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8));
byte[] clearText = decryptor.decrypt(cipherText);
Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8));
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
index 85ad7fe..7031eb7 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java
@@ -17,6 +17,17 @@
*/
package org.apache.flume.channel.jdbc;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -32,17 +43,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public abstract class BaseJdbcChannelProviderTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseJdbcChannelProviderTest.class);
@@ -103,7 +103,7 @@ public abstract class BaseJdbcChannelProviderTest {
Set<MockEvent> events = new HashSet<MockEvent>();
for (int i = 1; i < 12; i++) {
- events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 1));
+ events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 1));
}
Iterator<MockEvent> meIt = events.iterator();
@@ -170,7 +170,7 @@ public abstract class BaseJdbcChannelProviderTest {
new HashMap<String, List<MockEvent>>();
for (int i = 1; i < 121; i++) {
- MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10);
+ MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61 % i, 10);
List<MockEvent> meList = eventMap.get(me.getChannel());
if (meList == null) {
meList = new ArrayList<MockEvent>();
@@ -227,7 +227,7 @@ public abstract class BaseJdbcChannelProviderTest {
Set<MockEvent> events = new HashSet<MockEvent>();
for (int i = 1; i < 81; i++) {
- events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5));
+ events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 5));
}
Iterator<MockEvent> meIt = events.iterator();
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
index 1e412c5..6804a9f 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java
@@ -27,8 +27,7 @@ public class MockEvent implements Event {
private final Map<String, String> headers;
private final String channel;
- public MockEvent(byte[] payload, Map<String, String> headers, String channel)
- {
+ public MockEvent(byte[] payload, Map<String, String> headers, String channel) {
this.payload = payload;
this.headers = headers;
this.channel = channel;
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
index 10d8b51..e5ee324 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java
@@ -17,13 +17,13 @@
*/
package org.apache.flume.channel.jdbc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public final class MockEventUtils {
public static final Logger LOGGER =
@@ -70,20 +70,20 @@ public final class MockEventUtils {
* @param numChannels
* @return
*/
- public static MockEvent generateMockEvent(int payloadMargin,
- int headerNameMargin, int headerValueMargin, int numHeaders,
- int numChannels) {
+ public static MockEvent generateMockEvent(int payloadMargin, int headerNameMargin,
+ int headerValueMargin, int numHeaders,
+ int numChannels) {
int chIndex = 0;
if (numChannels > 1) {
- chIndex = Math.abs(RANDOM.nextInt())%numChannels;
+ chIndex = Math.abs(RANDOM.nextInt()) % numChannels;
}
- String channel = "test-"+chIndex;
+ String channel = "test-" + chIndex;
StringBuilder sb = new StringBuilder("New Event[payload size:");
int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD;
- int plSize = Math.abs(RANDOM.nextInt())%plTh + payloadMargin;
+ int plSize = Math.abs(RANDOM.nextInt()) % plTh + payloadMargin;
sb.append(plSize).append(", numHeaders:").append(numHeaders);
sb.append(", channel:").append(channel);
@@ -93,8 +93,8 @@ public final class MockEventUtils {
Map<String, String> headers = new HashMap<String, String>();
for (int i = 0; i < numHeaders; i++) {
- int nmSize = Math.abs(RANDOM.nextInt())%nmTh + headerNameMargin;
- int vlSize = Math.abs(RANDOM.nextInt())%vlTh + headerValueMargin;
+ int nmSize = Math.abs(RANDOM.nextInt()) % nmTh + headerNameMargin;
+ int vlSize = Math.abs(RANDOM.nextInt()) % vlTh + headerValueMargin;
String name = generateHeaderString(nmSize);
String value = generateHeaderString(vlSize);
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
index 362bcfa..cad972c 100644
--- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
+++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java
@@ -107,7 +107,6 @@ public class TestDerbySchemaHandlerQueries {
= "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, "
+ "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
-
public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL
= "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
@@ -119,7 +118,6 @@ public class TestDerbySchemaHandlerQueries {
+ "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE "
+ "FLE_CHANNEL = ?)";
-
public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL
= "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index d01346a..b63ac9b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -130,19 +130,19 @@ public class TestKafkaChannel {
Properties consumerProps = channel.getConsumerProps();
Properties producerProps = channel.getProducerProps();
- Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl());
- Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something");
- Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-
+ Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+ testUtil.getKafkaServerUrl());
+ Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+ "flume-something");
+ Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
+ "earliest");
}
-
@Test
public void testSuccess() throws Exception {
doTestSuccessRollback(false, false);
}
-
@Test
public void testSuccessInterleave() throws Exception {
doTestSuccessRollback(false, true);
@@ -212,8 +212,10 @@ public class TestKafkaChannel {
KafkaChannel channel = startChannel(false);
- KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(channel.getProducerProps());
- ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes());
+ KafkaProducer<String, byte[]> producer =
+ new KafkaProducer<String, byte[]>(channel.getProducerProps());
+ ProducerRecord<String, byte[]> data =
+ new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes());
producer.send(data).get();
producer.flush();
producer.close();
@@ -234,7 +236,7 @@ public class TestKafkaChannel {
}
private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
- for (int i=0; i < 5; i++) {
+ for (int i = 0; i < 5; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
@@ -255,17 +257,19 @@ public class TestKafkaChannel {
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
for (int i = 0; i < 50; i++) {
- ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes());
+ ProducerRecord<String, byte[]> data =
+ new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header",
+ String.valueOf(i).getBytes());
producer.send(data).get();
}
ExecutorCompletionService<Void> submitterSvc = new
- ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
- List<Event> events = pullEvents(channel, submitterSvc,
- 50, false, false);
+ ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+ List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
wait(submitterSvc, 5);
Map<Integer, String> finals = new HashMap<Integer, String>();
for (int i = 0; i < 50; i++) {
- finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
+ finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+ events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
Assert.assertTrue(finals.keySet().contains(i));
@@ -284,7 +288,8 @@ public class TestKafkaChannel {
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
for (int i = 0; i < 50; i++) {
- ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
+ ProducerRecord<String, byte[]> data =
+ new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
producer.send(data).get();
}
ExecutorCompletionService<Void> submitterSvc = new
@@ -323,14 +328,14 @@ public class TestKafkaChannel {
channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
}
tx.commit();
- ExecutorCompletionService<Void> submitterSvc = new
- ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
- List<Event> events = pullEvents(channel, submitterSvc,
- 50, false, false);
+ ExecutorCompletionService<Void> submitterSvc =
+ new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+ List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
wait(submitterSvc, 5);
Map<Integer, String> finals = new HashMap<Integer, String>();
for (int i = 0; i < 50; i++) {
- finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER));
+ finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+ events.get(i).getHeaders().get(KEY_HEADER));
}
for (int i = 0; i < 50; i++) {
Assert.assertTrue(finals.keySet().contains(i));
@@ -403,13 +408,13 @@ public class TestKafkaChannel {
}
private void writeAndVerify(final boolean testRollbacks,
- final KafkaChannel channel, final boolean interleave) throws Exception {
+ final KafkaChannel channel, final boolean interleave)
+ throws Exception {
final List<List<Event>> events = createBaseList();
ExecutorCompletionService<Void> submitterSvc =
- new ExecutorCompletionService<Void>(Executors
- .newCachedThreadPool());
+ new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
putEvents(channel, events, submitterSvc);
@@ -418,11 +423,9 @@ public class TestKafkaChannel {
}
ExecutorCompletionService<Void> submitterSvc2 =
- new ExecutorCompletionService<Void>(Executors
- .newCachedThreadPool());
+ new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
- final List<Event> eventsPulled =
- pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+ final List<Event> eventsPulled = pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
if (!interleave) {
wait(submitterSvc, 5);
@@ -586,18 +589,18 @@ public class TestKafkaChannel {
int numPartitions = 5;
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
- ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-
+ ZkUtils zkUtils =
+ ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
int replicationFactor = 1;
Properties topicConfig = new Properties();
- AdminUtils.createTopic(zkUtils, topicName, numPartitions,
- replicationFactor, topicConfig);
+ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
}
public static void deleteTopic(String topicName) {
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
- ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+ ZkUtils zkUtils =
+ ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
AdminUtils.deleteTopic(zkUtils, topicName);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
index 1e4e819..848636b 100644
--- a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
+++ b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java
@@ -24,10 +24,13 @@ import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-
import java.util.UUID;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.channel.file.FileChannelConfiguration;
@@ -39,7 +42,6 @@ import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
-
public class TestSpillableMemoryChannel {
private SpillableMemoryChannel channel;
@@ -51,14 +53,14 @@ public class TestSpillableMemoryChannel {
Context context = new Context();
File checkPointDir = fileChannelDir.newFolder("checkpoint");
File dataDir = fileChannelDir.newFolder("data");
- context.put(FileChannelConfiguration.CHECKPOINT_DIR
- , checkPointDir.getAbsolutePath());
+ context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkPointDir.getAbsolutePath());
context.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath());
// Set checkpoint for 5 seconds otherwise test will run out of memory
context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");
- if (overrides != null)
+ if (overrides != null) {
context.putAll(overrides);
+ }
Configurables.configure(channel, context);
}
@@ -81,9 +83,9 @@ public class TestSpillableMemoryChannel {
startChannel(params);
}
-
static class NullFound extends RuntimeException {
public int expectedValue;
+
public NullFound(int expected) {
super("Expected " + expected + ", but null found");
expectedValue = expected;
@@ -92,9 +94,10 @@ public class TestSpillableMemoryChannel {
static class TooManyNulls extends RuntimeException {
private int nullsFound;
+
public TooManyNulls(int count) {
super("Total nulls found in thread ("
- + Thread.currentThread().getName() + ") : " + count);
+ + Thread.currentThread().getName() + ") : " + count);
nullsFound = count;
}
}
@@ -102,7 +105,7 @@ public class TestSpillableMemoryChannel {
@Before
public void setUp() {
channel = new SpillableMemoryChannel();
- channel.setName("spillChannel-" + UUID.randomUUID() );
+ channel.setName("spillChannel-" + UUID.randomUUID());
}
@After
@@ -117,7 +120,7 @@ public class TestSpillableMemoryChannel {
}
private static void takeNull(AbstractChannel channel) {
- channel.take();
+ channel.take();
}
private static void takeN(int first, int count, AbstractChannel channel) {
@@ -127,7 +130,7 @@ public class TestSpillableMemoryChannel {
if (e == null) {
throw new NullFound(i);
}
- Event expected = EventBuilder.withBody( String.valueOf(i).getBytes() );
+ Event expected = EventBuilder.withBody(String.valueOf(i).getBytes());
Assert.assertArrayEquals(e.getBody(), expected.getBody());
}
}
@@ -140,16 +143,14 @@ public class TestSpillableMemoryChannel {
if (e == null) {
try {
Thread.sleep(0);
- } catch (InterruptedException ex)
- { /* ignore */ }
+ } catch (InterruptedException ex) { /* ignore */ }
return i;
}
}
return i;
}
- private static void transactionalPutN(int first, int count,
- AbstractChannel channel) {
+ private static void transactionalPutN(int first, int count, AbstractChannel channel) {
Transaction tx = channel.getTransaction();
tx.begin();
try {
@@ -163,8 +164,7 @@ public class TestSpillableMemoryChannel {
}
}
- private static void transactionalTakeN(int first, int count,
- AbstractChannel channel) {
+ private static void transactionalTakeN(int first, int count, AbstractChannel channel) {
Transaction tx = channel.getTransaction();
tx.begin();
try {
@@ -184,14 +184,13 @@ public class TestSpillableMemoryChannel {
}
}
- private static int transactionalTakeN_NoCheck(int count
- , AbstractChannel channel) {
+ private static int transactionalTakeN_NoCheck(int count, AbstractChannel channel) {
Transaction tx = channel.getTransaction();
tx.begin();
try {
int eventCount = takeN_NoCheck(count, channel);
tx.commit();
- return eventCount;
+ return eventCount;
} catch (RuntimeException e) {
tx.rollback();
throw e;
@@ -204,8 +203,9 @@ public class TestSpillableMemoryChannel {
Transaction tx = channel.getTransaction();
tx.begin();
try {
- for (int i = 0; i < count; ++i)
+ for (int i = 0; i < count; ++i) {
takeNull(channel);
+ }
tx.commit();
} catch (AssertionError e) {
tx.rollback();
@@ -218,68 +218,63 @@ public class TestSpillableMemoryChannel {
}
}
- private Thread makePutThread(String threadName
- , final int first, final int count, final int batchSize
- , final AbstractChannel channel) {
- return
- new Thread(threadName) {
- public void run() {
- int maxdepth = 0;
- StopWatch watch = new StopWatch();
- for (int i = first; i<first+count; i=i+batchSize) {
- transactionalPutN(i, batchSize, channel);
+ private Thread makePutThread(String threadName, final int first, final int count,
+ final int batchSize, final AbstractChannel channel) {
+ return new Thread(threadName) {
+ public void run() {
+ int maxdepth = 0;
+ StopWatch watch = new StopWatch();
+ for (int i = first; i < first + count; i = i + batchSize) {
+ transactionalPutN(i, batchSize, channel);
+ }
+ watch.elapsed();
+ }
+ };
+ }
+
+ private static Thread makeTakeThread(String threadName, final int first, final int count,
+ final int batchSize, final AbstractChannel channel) {
+ return new Thread(threadName) {
+ public void run() {
+ StopWatch watch = new StopWatch();
+ for (int i = first; i < first + count; ) {
+ try {
+ transactionalTakeN(i, batchSize, channel);
+ i = i + batchSize;
+ } catch (NullFound e) {
+ i = e.expectedValue;
}
- watch.elapsed();
}
- };
- }
-
- private static Thread makeTakeThread(String threadName, final int first
- , final int count, final int batchSize, final AbstractChannel channel) {
- return
- new Thread(threadName) {
- public void run() {
- StopWatch watch = new StopWatch();
- for (int i = first; i < first+count; ) {
+ watch.elapsed();
+ }
+ };
+ }
+
+ private static Thread makeTakeThread_noCheck(String threadName, final int totalEvents,
+ final int batchSize, final AbstractChannel channel) {
+ return new Thread(threadName) {
+ public void run() {
+ int batchSz = batchSize;
+ StopWatch watch = new StopWatch();
+ int i = 0, attempts = 0;
+ while (i < totalEvents) {
+ int remaining = totalEvents - i;
+ batchSz = (remaining > batchSz) ? batchSz : remaining;
+ int takenCount = transactionalTakeN_NoCheck(batchSz, channel);
+ if (takenCount < batchSz) {
try {
- transactionalTakeN(i, batchSize, channel);
- i = i + batchSize;
- } catch (NullFound e) {
- i = e.expectedValue;
- }
+ Thread.sleep(20);
+ } catch (InterruptedException ex) { /* ignore */ }
}
- watch.elapsed();
- }
- };
- }
-
- private static Thread makeTakeThread_noCheck(String threadName
- , final int totalEvents, final int batchSize, final AbstractChannel channel) {
- return
- new Thread(threadName) {
- public void run() {
- int batchSz = batchSize;
- StopWatch watch = new StopWatch();
- int i = 0, attempts = 0 ;
- while(i < totalEvents) {
- int remaining = totalEvents - i;
- batchSz = (remaining > batchSz) ? batchSz : remaining;
- int takenCount = transactionalTakeN_NoCheck(batchSz, channel);
- if(takenCount < batchSz) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException ex)
- { /* ignore */ }
- }
- i += takenCount;
- ++attempts;
- if(attempts > totalEvents * 3 ) {
- throw new TooManyNulls(attempts);
- }
+ i += takenCount;
+ ++attempts;
+ if (attempts > totalEvents * 3) {
+ throw new TooManyNulls(attempts);
}
- watch.elapsed(" items = " + i + ", attempts = " + attempts);
}
- };
+ watch.elapsed(" items = " + i + ", attempts = " + attempts);
+ }
+ };
}
@Test
@@ -292,37 +287,36 @@ public class TestSpillableMemoryChannel {
Transaction tx = channel.getTransaction();
tx.begin();
- putN(0,2,channel);
+ putN(0, 2, channel);
tx.commit();
tx.close();
tx = channel.getTransaction();
tx.begin();
- takeN(0,2,channel);
+ takeN(0, 2, channel);
tx.commit();
tx.close();
}
-
@Test
- public void testCapacityDisableOverflow() {
+ public void testCapacityDisableOverflow() {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "2");
params.put("overflowCapacity", "0"); // overflow is disabled effectively
- params.put("overflowTimeout", "0" );
+ params.put("overflowTimeout", "0");
startChannel(params);
- transactionalPutN(0,2,channel);
+ transactionalPutN(0, 2, channel);
boolean threw = false;
try {
- transactionalPutN(2,1,channel);
+ transactionalPutN(2, 1, channel);
} catch (ChannelException e) {
threw = true;
}
Assert.assertTrue("Expecting ChannelFullException to be thrown", threw);
- transactionalTakeN(0,2, channel);
+ transactionalTakeN(0, 2, channel);
Transaction tx = channel.getTransaction();
tx.begin();
@@ -332,7 +326,7 @@ public class TestSpillableMemoryChannel {
}
@Test
- public void testCapacityWithOverflow() {
+ public void testCapacityWithOverflow() {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "2");
params.put("overflowCapacity", "4");
@@ -346,19 +340,19 @@ public class TestSpillableMemoryChannel {
boolean threw = false;
try {
- transactionalPutN(7,2,channel); // cannot fit in channel
+ transactionalPutN(7, 2, channel); // cannot fit in channel
} catch (ChannelFullException e) {
threw = true;
}
Assert.assertTrue("Expecting ChannelFullException to be thrown", threw);
- transactionalTakeN(1,2, channel);
- transactionalTakeN(3,2, channel);
- transactionalTakeN(5,2, channel);
+ transactionalTakeN(1, 2, channel);
+ transactionalTakeN(3, 2, channel);
+ transactionalTakeN(5, 2, channel);
}
@Test
- public void testRestart() {
+ public void testRestart() {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "2");
params.put("overflowCapacity", "10");
@@ -372,8 +366,7 @@ public class TestSpillableMemoryChannel {
restartChannel(params);
// from overflow, as in memory stuff should be lost
- transactionalTakeN(3,2, channel);
-
+ transactionalTakeN(3, 2, channel);
}
@Test
@@ -382,35 +375,34 @@ public class TestSpillableMemoryChannel {
params.put("memoryCapacity", "10000000");
params.put("overflowCapacity", "20000000");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
- params.put("overflowTimeout", "1" );
+ params.put("overflowTimeout", "1");
startChannel(params);
- transactionalPutN( 1,5,channel);
- transactionalPutN( 6,5,channel);
- transactionalPutN(11,5,channel); // these should go to overflow
+ transactionalPutN(1, 5, channel);
+ transactionalPutN(6, 5, channel);
+ transactionalPutN(11, 5, channel); // these should go to overflow
- transactionalTakeN(1,10, channel);
- transactionalTakeN(11,5, channel);
+ transactionalTakeN(1, 10, channel);
+ transactionalTakeN(11, 5, channel);
}
@Test
public void testOverflow() {
-
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "10");
params.put("overflowCapacity", "20");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
- params.put("overflowTimeout", "1" );
+ params.put("overflowTimeout", "1");
startChannel(params);
- transactionalPutN( 1,5,channel);
- transactionalPutN( 6,5,channel);
- transactionalPutN(11,5,channel); // these should go to overflow
+ transactionalPutN(1, 5, channel);
+ transactionalPutN(6, 5, channel);
+ transactionalPutN(11, 5, channel); // these should go to overflow
- transactionalTakeN(1,10, channel);
- transactionalTakeN(11,5, channel);
+ transactionalTakeN(1, 10, channel);
+ transactionalTakeN(11, 5, channel);
}
@Test
@@ -419,29 +411,29 @@ public class TestSpillableMemoryChannel {
params.put("memoryCapacity", "10");
params.put("overflowCapacity", "10");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "5");
- params.put("overflowTimeout", "1" );
+ params.put("overflowTimeout", "1");
startChannel(params);
- transactionalPutN( 1,5,channel);
- transactionalPutN( 6,5,channel);
- transactionalPutN(11,5,channel); // into overflow
- transactionalPutN(16,5,channel); // into overflow
+ transactionalPutN(1, 5, channel);
+ transactionalPutN(6, 5, channel);
+ transactionalPutN(11, 5, channel); // into overflow
+ transactionalPutN(16, 5, channel); // into overflow
transactionalTakeN(1, 1, channel);
- transactionalTakeN(2, 5,channel);
- transactionalTakeN(7, 4,channel);
+ transactionalTakeN(2, 5, channel);
+ transactionalTakeN(7, 4, channel);
- transactionalPutN( 20,2,channel);
- transactionalPutN( 22,3,channel);
+ transactionalPutN(20, 2, channel);
+ transactionalPutN(22, 3, channel);
- transactionalTakeN( 11,3,channel); // from overflow
- transactionalTakeN( 14,5,channel); // from overflow
- transactionalTakeN( 19,2,channel); // from overflow
+ transactionalTakeN(11, 3, channel); // from overflow
+ transactionalTakeN(14, 5, channel); // from overflow
+ transactionalTakeN(19, 2, channel); // from overflow
}
@Test
- public void testByteCapacity() {
+ public void testByteCapacity() {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "1000");
// configure to hold 8 events of 10 bytes each (plus 20% event header space)
@@ -449,12 +441,12 @@ public class TestSpillableMemoryChannel {
params.put("avgEventSize", "10");
params.put("overflowCapacity", "20");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
- params.put("overflowTimeout", "1" );
+ params.put("overflowTimeout", "1");
startChannel(params);
transactionalPutN(1, 8, channel); // this wil max the byteCapacity
transactionalPutN(9, 10, channel);
- transactionalPutN(19,10, channel); // this will fill up the overflow
+ transactionalPutN(19, 10, channel); // this will fill up the overflow
boolean threw = false;
try {
@@ -463,7 +455,6 @@ public class TestSpillableMemoryChannel {
threw = true;
}
Assert.assertTrue("byteCapacity did not throw as expected", threw);
-
}
@Test
@@ -473,7 +464,7 @@ public class TestSpillableMemoryChannel {
params.put("memoryCapacity", "5");
params.put("overflowCapacity", "15");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
- params.put("overflowTimeout", "1" );
+ params.put("overflowTimeout", "1");
startChannel(params);
transactionalPutN(1, 5, channel);
@@ -493,13 +484,13 @@ public class TestSpillableMemoryChannel {
transactionalTakeN(6, 5, channel); // from overflow
transactionalTakeN(11, 5, channel); // from overflow
- transactionalTakeN(16, 2,channel); // from overflow
+ transactionalTakeN(16, 2, channel); // from overflow
transactionalPutN(21, 5, channel);
tx = channel.getTransaction();
tx.begin();
- takeN(18,3, channel); // from overflow
+ takeN(18, 3, channel); // from overflow
takeNull(channel); // expect null since next event is in primary
tx.commit();
tx.close();
@@ -516,9 +507,8 @@ public class TestSpillableMemoryChannel {
params.put("overflowTimeout", "0");
startChannel(params);
-
//1 Rollback for Puts
- transactionalPutN(1,5, channel);
+ transactionalPutN(1, 5, channel);
Transaction tx = channel.getTransaction();
tx.begin();
putN(6, 5, channel);
@@ -530,8 +520,7 @@ public class TestSpillableMemoryChannel {
//2. verify things back to normal after put rollback
transactionalPutN(11, 5, channel);
- transactionalTakeN(11,5,channel);
-
+ transactionalTakeN(11, 5, channel);
//3 Rollback for Takes
transactionalPutN(16, 5, channel);
@@ -545,13 +534,12 @@ public class TestSpillableMemoryChannel {
transactionalTakeN_NoCheck(5, channel);
//4. verify things back to normal after take rollback
- transactionalPutN(21,5, channel);
- transactionalTakeN(21,5,channel);
+ transactionalPutN(21, 5, channel);
+ transactionalTakeN(21, 5, channel);
}
-
@Test
- public void testReconfigure() {
+ public void testReconfigure() {
//1) bring up with small capacity
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "10");
@@ -559,12 +547,12 @@ public class TestSpillableMemoryChannel {
params.put("overflowTimeout", "0");
startChannel(params);
- Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
- , channel.getOverflowTimeout() == 0);
- Assert.assertTrue("memoryCapacity did not reconfigure correctly"
- , channel.getMemoryCapacity() == 10);
- Assert.assertTrue("overflowCapacity did not reconfigure correctly"
- , channel.isOverflowDisabled() );
+ Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+ channel.getOverflowTimeout() == 0);
+ Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+ channel.getMemoryCapacity() == 10);
+ Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+ channel.isOverflowDisabled());
transactionalPutN(1, 10, channel);
boolean threw = false;
@@ -574,8 +562,7 @@ public class TestSpillableMemoryChannel {
threw = true;
}
Assert.assertTrue("Expected the channel to fill up and throw an exception, "
- + "but it did not throw", threw);
-
+ + "but it did not throw", threw);
//2) Resize and verify
params = new HashMap<String, String>();
@@ -583,12 +570,13 @@ public class TestSpillableMemoryChannel {
params.put("overflowCapacity", "0");
reconfigureChannel(params);
- Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
- , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout);
- Assert.assertTrue("memoryCapacity did not reconfigure correctly"
- , channel.getMemoryCapacity() == 20);
- Assert.assertTrue("overflowCapacity did not reconfigure correctly"
- , channel.isOverflowDisabled() );
+ Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+ channel.getOverflowTimeout() ==
+ SpillableMemoryChannel.defaultOverflowTimeout);
+ Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+ channel.getMemoryCapacity() == 20);
+ Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+ channel.isOverflowDisabled());
// pull out the values inserted prior to reconfiguration
transactionalTakeN(1, 10, channel);
@@ -603,25 +591,25 @@ public class TestSpillableMemoryChannel {
threw = true;
}
Assert.assertTrue("Expected the channel to fill up and throw an exception, "
- + "but it did not throw", threw);
+ + "but it did not throw", threw);
transactionalTakeN(11, 10, channel);
transactionalTakeN(21, 10, channel);
-
// 3) Reconfigure with empty config and verify settings revert to default
params = new HashMap<String, String>();
reconfigureChannel(params);
- Assert.assertTrue("overflowTimeout setting did not reconfigure correctly"
- , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout);
- Assert.assertTrue("memoryCapacity did not reconfigure correctly"
- , channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity);
- Assert.assertTrue("overflowCapacity did not reconfigure correctly"
- , channel.getOverflowCapacity() == SpillableMemoryChannel.defaultOverflowCapacity);
- Assert.assertFalse("overflowCapacity did not reconfigure correctly"
- , channel.isOverflowDisabled());
-
+ Assert.assertTrue("overflowTimeout setting did not reconfigure correctly",
+ channel.getOverflowTimeout() ==
+ SpillableMemoryChannel.defaultOverflowTimeout);
+ Assert.assertTrue("memoryCapacity did not reconfigure correctly",
+ channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity);
+ Assert.assertTrue("overflowCapacity did not reconfigure correctly",
+ channel.getOverflowCapacity() ==
+ SpillableMemoryChannel.defaultOverflowCapacity);
+ Assert.assertFalse("overflowCapacity did not reconfigure correctly",
+ channel.isOverflowDisabled());
// 4) Reconfiguring of overflow
params = new HashMap<String, String>();
@@ -631,19 +619,18 @@ public class TestSpillableMemoryChannel {
params.put("overflowTimeout", "1");
reconfigureChannel(params);
- transactionalPutN( 1,5, channel);
- transactionalPutN( 6,5, channel);
- transactionalPutN(11,5, channel);
- transactionalPutN(16,5, channel);
- threw=false;
+ transactionalPutN(1, 5, channel);
+ transactionalPutN(6, 5, channel);
+ transactionalPutN(11, 5, channel);
+ transactionalPutN(16, 5, channel);
+ threw = false;
try {
// should error out as both primary & overflow are full
- transactionalPutN(21,5, channel);
+ transactionalPutN(21, 5, channel);
} catch (ChannelException e) {
threw = true;
}
- Assert.assertTrue("Expected the last insertion to fail, but it didn't."
- , threw);
+ Assert.assertTrue("Expected the last insertion to fail, but it didn't.", threw);
// reconfig the overflow
params = new HashMap<String, String>();
@@ -654,10 +641,10 @@ public class TestSpillableMemoryChannel {
reconfigureChannel(params);
// should succeed now as we have made room in the overflow
- transactionalPutN(21,5, channel);
+ transactionalPutN(21, 5, channel);
- transactionalTakeN(1,10, channel);
- transactionalTakeN(11,5, channel);
+ transactionalTakeN(1, 10, channel);
+ transactionalTakeN(11, 5, channel);
transactionalTakeN(16, 5, channel);
transactionalTakeN(21, 5, channel);
}
@@ -666,13 +653,13 @@ public class TestSpillableMemoryChannel {
public void testParallelSingleSourceAndSink() throws InterruptedException {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "1000020");
- params.put("overflowCapacity", "0");
+ params.put("overflowCapacity", "0");
params.put("overflowTimeout", "3");
startChannel(params);
// run source and sink concurrently
Thread sourceThd = makePutThread("src", 1, 500000, 100, channel);
- Thread sinkThd = makeTakeThread("sink", 1, 500000, 100, channel);
+ Thread sinkThd = makeTakeThread("sink", 1, 500000, 100, channel);
StopWatch watch = new StopWatch();
@@ -683,15 +670,15 @@ public class TestSpillableMemoryChannel {
sinkThd.join();
watch.elapsed();
- System.out.println("Max Queue size " + channel.getMaxMemQueueSize() );
+ System.out.println("Max Queue size " + channel.getMaxMemQueueSize());
}
@Test
public void testCounters() throws InterruptedException {
Map<String, String> params = new HashMap<String, String>();
- params.put("memoryCapacity", "5000");
- params.put("overflowCapacity","5000");
- params.put("transactionCapacity","5000");
+ params.put("memoryCapacity", "5000");
+ params.put("overflowCapacity", "5000");
+ params.put("transactionCapacity", "5000");
params.put("overflowTimeout", "0");
startChannel(params);
@@ -706,7 +693,7 @@ public class TestSpillableMemoryChannel {
Assert.assertEquals(5000, channel.channelCounter.getEventPutSuccessCount());
//2. empty mem queue
- Thread sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
+ Thread sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
sinkThd.start();
sinkThd.join();
Assert.assertEquals(0, channel.getTotalStored());
@@ -714,7 +701,6 @@ public class TestSpillableMemoryChannel {
Assert.assertEquals(5000, channel.channelCounter.getEventTakeAttemptCount());
Assert.assertEquals(5000, channel.channelCounter.getEventTakeSuccessCount());
-
//3. fill up mem & overflow
sourceThd = makePutThread("src", 1, 10000, 1000, channel);
sourceThd.start();
@@ -724,9 +710,8 @@ public class TestSpillableMemoryChannel {
Assert.assertEquals(15000, channel.channelCounter.getEventPutAttemptCount());
Assert.assertEquals(15000, channel.channelCounter.getEventPutSuccessCount());
-
//4. empty memory
- sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
+ sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel);
sinkThd.start();
sinkThd.join();
Assert.assertEquals(5000, channel.getTotalStored());
@@ -745,12 +730,10 @@ public class TestSpillableMemoryChannel {
Assert.assertEquals(15000, channel.channelCounter.getEventTakeAttemptCount());
Assert.assertEquals(15000, channel.channelCounter.getEventTakeSuccessCount());
-
-
//6. now do it concurrently
sourceThd = makePutThread("src1", 1, 5000, 1000, channel);
Thread sourceThd2 = makePutThread("src2", 1, 5000, 500, channel);
- sinkThd = makeTakeThread_noCheck("sink1", 5000, 1000, channel);
+ sinkThd = makeTakeThread_noCheck("sink1", 5000, 1000, channel);
sourceThd.start();
sourceThd2.start();
sinkThd.start();
@@ -759,8 +742,8 @@ public class TestSpillableMemoryChannel {
sinkThd.join();
Assert.assertEquals(5000, channel.getTotalStored());
Assert.assertEquals(5000, channel.channelCounter.getChannelSize());
- Thread sinkThd2 = makeTakeThread_noCheck("sink2", 2500, 500, channel);
- Thread sinkThd3 = makeTakeThread_noCheck("sink3", 2500, 1000, channel);
+ Thread sinkThd2 = makeTakeThread_noCheck("sink2", 2500, 500, channel);
+ Thread sinkThd3 = makeTakeThread_noCheck("sink3", 2500, 1000, channel);
sinkThd2.start();
sinkThd3.start();
sinkThd2.join();
@@ -769,30 +752,26 @@ public class TestSpillableMemoryChannel {
Assert.assertEquals(0, channel.channelCounter.getChannelSize());
Assert.assertEquals(25000, channel.channelCounter.getEventTakeSuccessCount());
Assert.assertEquals(25000, channel.channelCounter.getEventPutSuccessCount());
- Assert.assertTrue("TakeAttempt channel counter value larger than expected" ,
- 25000 <= channel.channelCounter.getEventTakeAttemptCount());
+ Assert.assertTrue("TakeAttempt channel counter value larger than expected",
+ 25000 <= channel.channelCounter.getEventTakeAttemptCount());
Assert.assertTrue("PutAttempt channel counter value larger than expected",
- 25000 <= channel.channelCounter.getEventPutAttemptCount());
+ 25000 <= channel.channelCounter.getEventPutAttemptCount());
}
- public ArrayList<Thread> createSourceThreads(int count, int totalEvents
- , int batchSize) {
+ public ArrayList<Thread> createSourceThreads(int count, int totalEvents, int batchSize) {
ArrayList<Thread> sourceThds = new ArrayList<Thread>();
for (int i = 0; i < count; ++i) {
- sourceThds.add( makePutThread("src" + i, 1, totalEvents/count
- , batchSize, channel) );
+ sourceThds.add(makePutThread("src" + i, 1, totalEvents / count, batchSize, channel));
}
return sourceThds;
}
- public ArrayList<Thread> createSinkThreads(int count, int totalEvents
- , int batchSize) {
+ public ArrayList<Thread> createSinkThreads(int count, int totalEvents, int batchSize) {
ArrayList<Thread> sinkThreads = new ArrayList<Thread>(count);
for (int i = 0; i < count; ++i) {
- sinkThreads.add( makeTakeThread_noCheck("sink"+i, totalEvents/count
- , batchSize, channel) );
+ sinkThreads.add(makeTakeThread_noCheck("sink" + i, totalEvents / count, batchSize, channel));
}
return sinkThreads;
}
@@ -803,13 +782,12 @@ public class TestSpillableMemoryChannel {
}
}
- public void joinThreads(ArrayList<Thread> threads)
- throws InterruptedException {
+ public void joinThreads(ArrayList<Thread> threads) throws InterruptedException {
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
- System.out.println("Interrupted while waiting on " + thread.getName() );
+ System.out.println("Interrupted while waiting on " + thread.getName());
throw e;
}
}
@@ -824,15 +802,13 @@ public class TestSpillableMemoryChannel {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "0");
- params.put("overflowCapacity", "500020");
+ params.put("overflowCapacity", "500020");
params.put("overflowTimeout", "3");
startChannel(params);
ArrayList<Thread> sinks = createSinkThreads(sinkCount, eventCount, batchSize);
- ArrayList<Thread> sources = createSourceThreads(sourceCount
- , eventCount, batchSize);
-
+ ArrayList<Thread> sources = createSourceThreads(sourceCount, eventCount, batchSize);
StopWatch watch = new StopWatch();
startThreads(sinks);
@@ -845,7 +821,7 @@ public class TestSpillableMemoryChannel {
System.out.println("Total puts " + channel.drainOrder.totalPuts);
- System.out.println("Max Queue size " + channel.getMaxMemQueueSize() );
+ System.out.println("Max Queue size " + channel.getMaxMemQueueSize());
System.out.println(channel.memQueue.size());
System.out.println("done");
@@ -872,10 +848,10 @@ public class TestSpillableMemoryChannel {
if (elapsed < 10000) {
System.out.println(Thread.currentThread().getName()
- + " : [ " + elapsed + " ms ]. " + suffix);
+ + " : [ " + elapsed + " ms ]. " + suffix);
} else {
System.out.println(Thread.currentThread().getName()
- + " : [ " + elapsed / 1000 + " sec ]. " + suffix);
+ + " : [ " + elapsed / 1000 + " sec ]. " + suffix);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
index 267ac1d..53795fb 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
@@ -65,7 +65,7 @@ public class TestLoadBalancingLog4jAppender {
private boolean slowDown = false;
@Before
- public void initiate() throws InterruptedException{
+ public void initiate() throws InterruptedException {
ch = new MemoryChannel();
configureChannel();
@@ -164,9 +164,9 @@ public class TestLoadBalancingLog4jAppender {
@Test
public void testRandomBackoffUnsafeMode() throws Exception {
File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
- .getClassLoader()
- .getResource("flume-loadbalancing-backoff-log4jtest.properties")
- .getFile());
+ .getClassLoader()
+ .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+ .getFile());
startSources(TESTFILE, true, new int[]{25430, 25431, 25432});
sources.get(0).setFail();
@@ -179,9 +179,9 @@ public class TestLoadBalancingLog4jAppender {
@Test (expected = EventDeliveryException.class)
public void testTimeout() throws Throwable {
File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
- .getClassLoader()
- .getResource("flume-loadbalancinglog4jtest.properties")
- .getFile());
+ .getClassLoader()
+ .getResource("flume-loadbalancinglog4jtest.properties")
+ .getFile());
ch = new TestLog4jAppender.SlowMemoryChannel(2000);
configureChannel();
@@ -200,9 +200,9 @@ public class TestLoadBalancingLog4jAppender {
@Test(expected = EventDeliveryException.class)
public void testRandomBackoffNotUnsafeMode() throws Throwable {
File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
- .getClassLoader()
- .getResource("flume-loadbalancing-backoff-log4jtest.properties")
- .getFile());
+ .getClassLoader()
+ .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+ .getFile());
startSources(TESTFILE, false, new int[]{25430, 25431, 25432});
sources.get(0).setFail();
@@ -224,17 +224,17 @@ public class TestLoadBalancingLog4jAppender {
}
private void sendAndAssertFail() throws IOException {
- int level = 20000;
- String msg = "This is log message number" + String.valueOf(level);
- fixture.log(Level.toLevel(level), msg);
+ int level = 20000;
+ String msg = "This is log message number" + String.valueOf(level);
+ fixture.log(Level.toLevel(level), msg);
- Transaction transaction = ch.getTransaction();
- transaction.begin();
- Event event = ch.take();
- Assert.assertNull(event);
+ Transaction transaction = ch.getTransaction();
+ transaction.begin();
+ Event event = ch.take();
+ Assert.assertNull(event);
- transaction.commit();
- transaction.close();
+ transaction.commit();
+ transaction.close();
}
@@ -271,8 +271,7 @@ public class TestLoadBalancingLog4jAppender {
}
private void startSources(File log4jProps, boolean unsafeMode, int... ports)
- throws
- IOException {
+ throws IOException {
for (int port : ports) {
CountingAvroSource source = new CountingAvroSource(port);
Context context = new Context();
@@ -297,8 +296,8 @@ public class TestLoadBalancingLog4jAppender {
Properties props = new Properties();
props.load(reader);
props.setProperty("log4j.appender.out2.UnsafeMode",
- String.valueOf(unsafeMode));
- if(slowDown) {
+ String.valueOf(unsafeMode));
+ if (slowDown) {
props.setProperty("log4j.appender.out2.Timeout", String.valueOf(1000));
}
PropertyConfigurator.configure(props);
@@ -308,13 +307,13 @@ public class TestLoadBalancingLog4jAppender {
static class CountingAvroSource extends AvroSource {
AtomicInteger appendCount = new AtomicInteger();
volatile boolean isFail = false;
- private final int port2;
+ private final int port2;
public CountingAvroSource(int port) {
- port2 = port;
+ port2 = port;
}
- public void setOk() {
+ public void setOk() {
this.isFail = false;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
index 1b840f3..c087b67 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
@@ -21,7 +21,10 @@ package org.apache.flume.clients.log4jappender;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -46,13 +49,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class TestLog4jAppender{
+public class TestLog4jAppender {
private AvroSource source;
private Channel ch;
private Properties props;
@Before
- public void initiate() throws Exception{
+ public void initiate() throws Exception {
int port = 25430;
source = new AvroSource();
ch = new MemoryChannel();
@@ -88,13 +91,13 @@ public class TestLog4jAppender{
configureSource();
PropertyConfigurator.configure(props);
Logger logger = LogManager.getLogger(TestLog4jAppender.class);
- for(int count = 0; count <= 1000; count++){
+ for (int count = 0; count <= 1000; count++) {
/*
* Log4j internally defines levels as multiples of 10000. So if we
* create levels directly using count, the level will be set as the
* default.
*/
- int level = ((count % 5)+1)*10000;
+ int level = ((count % 5) + 1) * 10000;
String msg = "This is log message number" + String.valueOf(count);
logger.log(Level.toLevel(level), msg);
@@ -146,11 +149,11 @@ public class TestLog4jAppender{
}
private void sendAndAssertFail(Logger logger) throws Throwable {
- /*
- * Log4j internally defines levels as multiples of 10000. So if we
- * create levels directly using count, the level will be set as the
- * default.
- */
+ /*
+ * Log4j internally defines levels as multiples of 10000. So if we
+ * create levels directly using count, the level will be set as the
+ * default.
+ */
int level = 20000;
try {
logger.log(Level.toLevel(level), "Test Msg");
@@ -177,13 +180,13 @@ public class TestLog4jAppender{
PropertyConfigurator.configure(props);
Logger logger = LogManager.getLogger(TestLog4jAppender.class);
Thread.currentThread().setName("Log4jAppenderTest");
- for(int count = 0; count <= 100; count++){
+ for (int count = 0; count <= 100; count++) {
/*
* Log4j internally defines levels as multiples of 10000. So if we
* create levels directly using count, the level will be set as the
* default.
*/
- int level = ((count % 5)+1)*10000;
+ int level = ((count % 5) + 1) * 10000;
String msg = "This is log message number" + String.valueOf(count);
logger.log(Level.toLevel(level), msg);
@@ -230,7 +233,7 @@ public class TestLog4jAppender{
props.put("log4j.appender.out2.Timeout", "1000");
props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
props.put("log4j.appender.out2.layout.ConversionPattern",
- "%-5p [%t]: %m%n");
+ "%-5p [%t]: %m%n");
PropertyConfigurator.configure(props);
Logger logger = LogManager.getLogger(TestLog4jAppender.class);
Thread.currentThread().setName("Log4jAppenderTest");
@@ -251,13 +254,12 @@ public class TestLog4jAppender{
@After
- public void cleanUp(){
+ public void cleanUp() {
source.stop();
ch.stop();
props.clear();
}
-
static class SlowMemoryChannel extends MemoryChannel {
private final int slowTime;
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
index 5899c62..0607e3a 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
@@ -125,8 +125,7 @@ public class TestLog4jAppenderWithAvro {
Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
Assert.assertEquals("Schema URL should be set",
- "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString
- ()));
+ "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString()));
Assert.assertNull("Schema string should not be set",
hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
@@ -174,7 +173,7 @@ public class TestLog4jAppenderWithAvro {
}
@After
- public void cleanUp(){
+ public void cleanUp() {
source.stop();
ch.stop();
props.clear();
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
index 59a804c..7600856 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
@@ -40,6 +40,7 @@ import com.google.common.base.Preconditions;
public abstract class AbstractBasicChannelSemanticsTest {
protected static List<Event> events;
+
static {
Event[] array = new Event[7];
for (int i = 0; i < array.length; ++i) {
@@ -61,7 +62,7 @@ public abstract class AbstractBasicChannelSemanticsTest {
THROW_RUNTIME,
THROW_CHANNEL,
SLEEP
- };
+ }
private Mode mode = Mode.NORMAL;
private boolean lastTransactionCommitted = false;
@@ -158,11 +159,11 @@ public abstract class AbstractBasicChannelSemanticsTest {
protected static class TestError extends Error {
static final long serialVersionUID = -1;
- };
+ }
protected static class TestRuntimeException extends RuntimeException {
static final long serialVersionUID = -1;
- };
+ }
protected void testException(Class<? extends Throwable> exceptionClass,
Runnable test) {
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index b37b823..93bc0cf 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -20,16 +20,22 @@ package org.apache.flume.channel;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestChannelProcessor {
@@ -88,9 +94,9 @@ public class TestChannelProcessor {
public void testRequiredAndOptionalChannels() {
Context context = new Context();
ArrayList<Channel> channels = new ArrayList<Channel>();
- for(int i = 0; i < 4; i++) {
+ for (int i = 0; i < 4; i++) {
Channel ch = new MemoryChannel();
- ch.setName("ch"+i);
+ ch.setName("ch" + i);
Configurables.configure(ch, context);
channels.add(ch);
}
@@ -114,7 +120,7 @@ public class TestChannelProcessor {
} catch (InterruptedException e) {
}
- for(Channel channel : channels) {
+ for (Channel channel : channels) {
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event_ch = channel.take();
@@ -124,18 +130,18 @@ public class TestChannelProcessor {
}
List<Event> events = Lists.newArrayList();
- for(int i = 0; i < 100; i ++) {
- events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8));
+ for (int i = 0; i < 100; i++) {
+ events.add(EventBuilder.withBody("event " + i, Charsets.UTF_8));
}
processor.processEventBatch(events);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
- for(Channel channel : channels) {
+ for (Channel channel : channels) {
Transaction transaction = channel.getTransaction();
transaction.begin();
- for(int i = 0; i < 100; i ++) {
+ for (int i = 0; i < 100; i++) {
Event event_ch = channel.take();
Assert.assertNotNull(event_ch);
}