You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/08 22:51:00 UTC
[8/9] flume git commit: FLUME-2941. Integrate checkstyle for test
classes
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index 1adb21a..f1700f9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -55,6 +55,7 @@ public class TestFlumeEventQueue {
File inflightTakes;
File inflightPuts;
File queueSetDir;
+
EventQueueBackingStoreSupplier() {
baseDir = Files.createTempDir();
checkpoint = new File(baseDir, "checkpoint");
@@ -62,62 +63,73 @@ public class TestFlumeEventQueue {
inflightPuts = new File(baseDir, "inflighttakes");
queueSetDir = new File(baseDir, "queueset");
}
+
File getCheckpoint() {
return checkpoint;
}
+
File getInflightPuts() {
return inflightPuts;
}
+
File getInflightTakes() {
return inflightTakes;
}
+
File getQueueSetDir() {
return queueSetDir;
}
+
void delete() {
FileUtils.deleteQuietly(baseDir);
}
- abstract EventQueueBackingStore get() throws Exception ;
+
+ abstract EventQueueBackingStore get() throws Exception;
}
@Parameters
public static Collection<Object[]> data() throws Exception {
- Object[][] data = new Object[][] { {
- new EventQueueBackingStoreSupplier() {
- @Override
- public EventQueueBackingStore get() throws Exception {
- Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
- return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
- "test");
+ Object[][] data = new Object[][] {
+ {
+ new EventQueueBackingStoreSupplier() {
+ @Override
+ public EventQueueBackingStore get() throws Exception {
+ Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+ return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
+ "test");
+ }
}
- }
- }, {
- new EventQueueBackingStoreSupplier() {
- @Override
- public EventQueueBackingStore get() throws Exception {
- Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
- return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000,
- "test");
+ },
+ {
+ new EventQueueBackingStoreSupplier() {
+ @Override
+ public EventQueueBackingStore get() throws Exception {
+ Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+ return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test");
+ }
}
}
- } };
+ };
return Arrays.asList(data);
}
public TestFlumeEventQueue(EventQueueBackingStoreSupplier backingStoreSupplier) {
this.backingStoreSupplier = backingStoreSupplier;
}
+
@Before
public void setup() throws Exception {
backingStore = backingStoreSupplier.get();
}
+
@After
public void cleanup() throws IOException {
- if(backingStore != null) {
+ if (backingStore != null) {
backingStore.close();
}
backingStoreSupplier.delete();
}
+
@Test
public void testCapacity() throws Exception {
backingStore.close();
@@ -125,70 +137,76 @@ public class TestFlumeEventQueue {
Assert.assertTrue(checkpoint.delete());
backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertFalse(queue.addTail(pointer2));
}
- @Test(expected=IllegalArgumentException.class)
+
+ @Test(expected = IllegalArgumentException.class)
public void testInvalidCapacityZero() throws Exception {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
}
- @Test(expected=IllegalArgumentException.class)
+
+ @Test(expected = IllegalArgumentException.class)
public void testInvalidCapacityNegative() throws Exception {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
}
+
@Test
public void testQueueIsEmptyAfterCreation() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertNull(queue.removeHead(0L));
}
+
@Test
public void addTail1() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertEquals(pointer1, queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
}
+
@Test
public void addTail2() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertTrue(queue.addTail(pointer2));
Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
Assert.assertEquals(pointer1, queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
}
+
@Test
public void addTailLarge() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
int size = 500;
Set<Integer> fileIDs = Sets.newHashSet();
for (int i = 1; i <= size; i++) {
@@ -203,23 +221,25 @@ public class TestFlumeEventQueue {
}
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
}
+
@Test
public void addHead1() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
Assert.assertEquals(pointer1, queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
}
+
@Test
public void addHead2() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
queue.replayComplete();
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertTrue(queue.addHead(pointer2));
@@ -227,12 +247,13 @@ public class TestFlumeEventQueue {
Assert.assertEquals(pointer2, queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
}
+
@Test
public void addHeadLarge() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
queue.replayComplete();
int size = 500;
Set<Integer> fileIDs = Sets.newHashSet();
@@ -248,12 +269,13 @@ public class TestFlumeEventQueue {
}
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
}
+
@Test
public void addTailRemove1() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
Assert.assertTrue(queue.remove(pointer1));
@@ -266,9 +288,9 @@ public class TestFlumeEventQueue {
@Test
public void addTailRemove2() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertTrue(queue.addTail(pointer2));
Assert.assertTrue(queue.remove(pointer1));
@@ -279,31 +301,33 @@ public class TestFlumeEventQueue {
@Test
public void addHeadRemove1() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
queue.addHead(pointer1);
Assert.assertTrue(queue.remove(pointer1));
Assert.assertNull(queue.removeHead(0));
}
+
@Test
public void addHeadRemove2() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertTrue(queue.addHead(pointer2));
Assert.assertTrue(queue.remove(pointer1));
queue.replayComplete();
Assert.assertEquals(pointer2, queue.removeHead(0));
}
+
@Test
public void testUnknownPointerDoesNotCauseSearch() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertTrue(queue.addHead(pointer2));
Assert.assertFalse(queue.remove(pointer3)); // does search
@@ -312,44 +336,47 @@ public class TestFlumeEventQueue {
queue.replayComplete();
Assert.assertEquals(2, queue.getSearchCount());
}
- @Test(expected=IllegalStateException.class)
+
+ @Test(expected = IllegalStateException.class)
public void testRemoveAfterReplayComplete() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
queue.replayComplete();
queue.remove(pointer1);
}
+
@Test
public void testWrappingCorrectly() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
int size = Integer.MAX_VALUE;
for (int i = 1; i <= size; i++) {
- if(!queue.addHead(new FlumeEventPointer(i, i))) {
+ if (!queue.addHead(new FlumeEventPointer(i, i))) {
break;
}
}
- for (int i = queue.getSize()/2; i > 0; i--) {
+ for (int i = queue.getSize() / 2; i > 0; i--) {
Assert.assertNotNull(queue.removeHead(0));
}
// addHead below would throw an IndexOOBounds with
// bad version of FlumeEventQueue.convert
for (int i = 1; i <= size; i++) {
- if(!queue.addHead(new FlumeEventPointer(i, i))) {
+ if (!queue.addHead(new FlumeEventPointer(i, i))) {
break;
}
}
}
+
@Test
- public void testInflightPuts() throws Exception{
+ public void testInflightPuts() throws Exception {
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
long txnID2 = txnID1 + 1;
queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -358,16 +385,13 @@ public class TestFlumeEventQueue {
queue.checkpoint(true);
TimeUnit.SECONDS.sleep(3L);
queue = new FlumeEventQueue(backingStore,
- backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts(),
- backingStoreSupplier.getQueueSetDir());
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
- Assert.assertTrue(deserializedMap.get(
- txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
- Assert.assertTrue(deserializedMap.get(
- txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
- Assert.assertTrue(deserializedMap.get(
- txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+ Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+ Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
+ Assert.assertTrue(deserializedMap.get(txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
}
@Test
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
index 2fbe116..a138ed4 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
@@ -18,13 +18,8 @@
*/
package org.apache.flume.channel.file;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurables;
@@ -37,8 +32,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
public class TestIntegration {
@@ -58,19 +57,21 @@ public class TestIntegration {
dataDirs = new File[3];
dataDir = "";
for (int i = 0; i < dataDirs.length; i++) {
- dataDirs[i] = new File(baseDir, "data" + (i+1));
+ dataDirs[i] = new File(baseDir, "data" + (i + 1));
Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory());
dataDir += dataDirs[i].getAbsolutePath() + ",";
}
dataDir = dataDir.substring(0, dataDir.length() - 1);
}
+
@After
public void teardown() {
- if(channel != null && channel.isOpen()) {
+ if (channel != null && channel.isOpen()) {
channel.stop();
}
FileUtils.deleteQuietly(baseDir);
}
+
@Test
public void testIntegration() throws IOException, InterruptedException {
// set shorter checkpoint and filesize to ensure
@@ -106,11 +107,11 @@ public class TestIntegration {
TimeUnit.SECONDS.sleep(30);
// shutdown source
sourceRunner.shutdown();
- while(sourceRunner.isAlive()) {
+ while (sourceRunner.isAlive()) {
Thread.sleep(10L);
}
// wait for queue to clear
- while(channel.getDepth() > 0) {
+ while (channel.getDepth() > 0) {
Thread.sleep(10L);
}
// shutdown size
@@ -122,15 +123,15 @@ public class TestIntegration {
logs.addAll(LogUtils.getLogs(dataDirs[i]));
}
LOG.info("Total Number of Logs = " + logs.size());
- for(File logFile : logs) {
+ for (File logFile : logs) {
LOG.info("LogFile = " + logFile);
}
LOG.info("Source processed " + sinkRunner.getCount());
LOG.info("Sink processed " + sourceRunner.getCount());
- for(Exception ex : sourceRunner.getErrors()) {
+ for (Exception ex : sourceRunner.getErrors()) {
LOG.warn("Source had error", ex);
}
- for(Exception ex : sinkRunner.getErrors()) {
+ for (Exception ex : sinkRunner.getErrors()) {
LOG.warn("Sink had error", ex);
}
Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index b1f59cd..f7f0950 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -18,14 +18,8 @@
*/
package org.apache.flume.channel.file;
-import static org.mockito.Mockito.*;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.*;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -34,8 +28,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestLog {
private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
@@ -45,6 +44,7 @@ public class TestLog {
private File checkpointDir;
private File[] dataDirs;
private long transactionID;
+
@Before
public void setup() throws IOException {
transactionID = 0;
@@ -56,15 +56,20 @@ public class TestLog {
dataDirs[i] = Files.createTempDir();
Assert.assertTrue(dataDirs[i].isDirectory());
}
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false)
- .setChannelName("testlog").build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setCheckpointOnClose(false)
+ .setChannelName("testlog")
+ .build();
log.replay();
}
+
@After
- public void cleanup() throws Exception{
- if(log != null) {
+ public void cleanup() throws Exception {
+ if (log != null) {
log.close();
}
FileUtils.deleteQuietly(checkpointDir);
@@ -72,13 +77,14 @@ public class TestLog {
FileUtils.deleteQuietly(dataDirs[i]);
}
}
+
/**
* Test that we can put, commit and then get. Note that get is
* not transactional so the commit is not required.
*/
@Test
public void testPutGet()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long transactionID = ++this.transactionID;
FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -89,9 +95,10 @@ public class TestLog {
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}
+
@Test
public void testRoll()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
log.shutdownWorker();
Thread.sleep(1000);
for (int i = 0; i < 1000; i++) {
@@ -105,9 +112,9 @@ public class TestLog {
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}
int logCount = 0;
- for(File dataDir : dataDirs) {
- for(File logFile : dataDir.listFiles()) {
- if(logFile.getName().startsWith("log-")) {
+ for (File dataDir : dataDirs) {
+ for (File logFile : dataDir.listFiles()) {
+ if (logFile.getName().startsWith("log-")) {
logCount++;
}
}
@@ -115,26 +122,30 @@ public class TestLog {
// 93 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000
Assert.assertEquals(186, logCount);
}
+
/**
* After replay of the log, we should find the event because the put
* was committed
*/
@Test
public void testPutCommit()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long transactionID = ++this.transactionID;
FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn);
log.commitPut(transactionID);
log.close();
- log = new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
- dataDirs).setChannelName("testlog").build();
+ log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
takeAndVerify(eventPointerIn, eventIn);
}
+
/**
* After replay of the log, we should not find the event because the
* put was rolled back
@@ -146,39 +157,44 @@ public class TestLog {
log.put(transactionID, eventIn);
log.rollback(transactionID); // rolled back so it should not be replayed
log.close();
- log = new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
- dataDirs).setChannelName("testlog").build();
+ log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
FlumeEventQueue queue = log.getFlumeEventQueue();
Assert.assertNull(queue.removeHead(transactionID));
}
+
@Test
public void testMinimumRequiredSpaceTooSmallOnStartup() throws IOException,
- InterruptedException {
+ InterruptedException {
log.close();
- log = new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
- dataDirs).setChannelName("testlog").
- setMinimumRequiredSpace(Long.MAX_VALUE).build();
+ log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setMinimumRequiredSpace(Long.MAX_VALUE)
+ .build();
try {
log.replay();
Assert.fail();
} catch (IOException e) {
- Assert.assertTrue(e.getMessage(), e.getMessage()
- .startsWith("Usable space exhausted"));
+ Assert.assertTrue(e.getMessage(),
+ e.getMessage().startsWith("Usable space exhausted"));
}
}
+
/**
* There is a race here in that someone could take up some space
*/
@Test
- public void testMinimumRequiredSpaceTooSmallForPut() throws IOException,
- InterruptedException {
+ public void testMinimumRequiredSpaceTooSmallForPut() throws IOException, InterruptedException {
try {
doTestMinimumRequiredSpaceTooSmallForPut();
} catch (IOException e) {
@@ -189,23 +205,26 @@ public class TestLog {
doTestMinimumRequiredSpaceTooSmallForPut();
}
}
+
public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException,
- InterruptedException {
+ InterruptedException {
long minimumRequiredSpace = checkpointDir.getUsableSpace() -
- (10L* 1024L * 1024L);
+ (10L * 1024L * 1024L);
log.close();
- log = new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
- dataDirs).setChannelName("testlog").
- setMinimumRequiredSpace(minimumRequiredSpace)
- .setUsableSpaceRefreshInterval(1L).build();
+ log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setMinimumRequiredSpace(minimumRequiredSpace)
+ .setUsableSpaceRefreshInterval(1L)
+ .build();
log.replay();
File filler = new File(checkpointDir, "filler");
byte[] buffer = new byte[64 * 1024];
FileOutputStream out = new FileOutputStream(filler);
- while(checkpointDir.getUsableSpace() > minimumRequiredSpace) {
+ while (checkpointDir.getUsableSpace() > minimumRequiredSpace) {
out.write(buffer);
}
out.close();
@@ -215,10 +234,11 @@ public class TestLog {
log.put(transactionID, eventIn);
Assert.fail();
} catch (IOException e) {
- Assert.assertTrue(e.getMessage(), e.getMessage()
- .startsWith("Usable space exhausted"));
+ Assert.assertTrue(e.getMessage(),
+ e.getMessage().startsWith("Usable space exhausted"));
}
}
+
/**
* After replay of the log, we should not find the event because the take
* was committed
@@ -233,11 +253,13 @@ public class TestLog {
log.take(takeTransactionID, eventPointer);
log.commitTake(takeTransactionID);
log.close();
- new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").build();
+ new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(1)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
FlumeEventQueue queue = log.getFlumeEventQueue();
Assert.assertNull(queue.removeHead(0));
@@ -249,16 +271,18 @@ public class TestLog {
*/
@Test
public void testPutTakeRollbackLogReplayV1()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
doPutTakeRollback(true);
}
+
@Test
public void testPutTakeRollbackLogReplayV2()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
doPutTakeRollback(false);
}
+
public void doPutTakeRollback(boolean useLogReplayV1)
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long putTransactionID = ++transactionID;
FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn);
@@ -267,11 +291,14 @@ public class TestLog {
log.take(takeTransactionID, eventPointerIn);
log.rollback(takeTransactionID);
log.close();
- new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").setUseLogReplayV1(useLogReplayV1).build();
+ new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(1)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setUseLogReplayV1(useLogReplayV1)
+ .build();
log.replay();
takeAndVerify(eventPointerIn, eventIn);
}
@@ -281,11 +308,13 @@ public class TestLog {
long putTransactionID = ++transactionID;
log.commitPut(putTransactionID);
log.close();
- new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").build();
+ new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(1)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
FlumeEventQueue queue = log.getFlumeEventQueue();
FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -297,11 +326,13 @@ public class TestLog {
long putTransactionID = ++transactionID;
log.commitTake(putTransactionID);
log.close();
- new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").build();
+ new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(1)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
FlumeEventQueue queue = log.getFlumeEventQueue();
FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -313,11 +344,13 @@ public class TestLog {
long putTransactionID = ++transactionID;
log.rollback(putTransactionID);
log.close();
- new Log.Builder().setCheckpointInterval(
- Long.MAX_VALUE).setMaxFileSize(
- FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
- 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").build();
+ new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+ .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+ .setQueueSize(1)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
log.replay();
FlumeEventQueue queue = log.getFlumeEventQueue();
FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -337,7 +370,7 @@ public class TestLog {
File logGzip = new File(logDir, Log.PREFIX + i + ".gz");
Assert.assertTrue(metaDataFile.isFile() || metaDataFile.createNewFile());
Assert.assertTrue(metaDataTempFile.isFile() ||
- metaDataTempFile.createNewFile());
+ metaDataTempFile.createNewFile());
Assert.assertTrue(log.isFile() || logGzip.createNewFile());
}
List<File> actual = LogUtils.getLogs(logDir);
@@ -345,31 +378,38 @@ public class TestLog {
LogUtils.sort(expected);
Assert.assertEquals(expected, actual);
}
+
@Test
public void testReplayFailsWithAllEmptyLogMetaDataNormalReplay()
throws IOException, InterruptedException {
doTestReplayFailsWithAllEmptyLogMetaData(false);
}
+
@Test
public void testReplayFailsWithAllEmptyLogMetaDataFastReplay()
throws IOException, InterruptedException {
doTestReplayFailsWithAllEmptyLogMetaData(true);
}
+
public void doTestReplayFailsWithAllEmptyLogMetaData(boolean useFastReplay)
throws IOException, InterruptedException {
// setup log with correct fast replay parameter
log.close();
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setUseFastReplay(useFastReplay)
+ .build();
log.replay();
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long transactionID = ++this.transactionID;
log.put(transactionID, eventIn);
log.commitPut(transactionID);
log.close();
- if(useFastReplay) {
+ if (useFastReplay) {
FileUtils.deleteQuietly(checkpointDir);
Assert.assertTrue(checkpointDir.mkdir());
}
@@ -378,41 +418,50 @@ public class TestLog {
logFiles.addAll(LogUtils.getLogs(dataDirs[i]));
}
Assert.assertTrue(logFiles.size() > 0);
- for(File logFile : logFiles) {
+ for (File logFile : logFiles) {
File logFileMeta = Serialization.getMetaDataFile(logFile);
Assert.assertTrue(logFileMeta.delete());
Assert.assertTrue(logFileMeta.createNewFile());
}
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setUseFastReplay(useFastReplay)
+ .build();
try {
log.replay();
Assert.fail();
- } catch(IllegalStateException expected) {
+ } catch (IllegalStateException expected) {
String msg = expected.getMessage();
Assert.assertNotNull(msg);
Assert.assertTrue(msg, msg.contains(".meta is empty, but log"));
}
}
+
@Test
public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long transactionID = ++this.transactionID;
FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
log.commitPut(transactionID); // this is not required since
log.close();
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .build();
doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
}
+
@Test
public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay()
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
long transactionID = ++this.transactionID;
FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -421,18 +470,23 @@ public class TestLog {
checkpointDir = Files.createTempDir();
FileUtils.forceDeleteOnExit(checkpointDir);
Assert.assertTrue(checkpointDir.isDirectory());
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs)
- .setChannelName("testlog").setUseFastReplay(true).build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setChannelName("testlog")
+ .setUseFastReplay(true)
+ .build();
doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
}
+
public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn,
- FlumeEventPointer eventPointer) throws IOException,
- InterruptedException, NoopRecordException, CorruptEventException {
+ FlumeEventPointer eventPointer)
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
for (int i = 0; i < dataDirs.length; i++) {
- for(File logFile : LogUtils.getLogs(dataDirs[i])) {
- if(logFile.length() == 0L) {
+ for (File logFile : LogUtils.getLogs(dataDirs[i])) {
+ if (logFile.length() == 0L) {
File logFileMeta = Serialization.getMetaDataFile(logFile);
Assert.assertTrue(logFileMeta.delete());
Assert.assertTrue(logFileMeta.createNewFile());
@@ -445,16 +499,15 @@ public class TestLog {
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}
+
@Test
public void testCachedFSUsableSpace() throws Exception {
File fs = mock(File.class);
when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE);
- LogFile.CachedFSUsableSpace cachedFS =
- new LogFile.CachedFSUsableSpace(fs, 1000L);
+ LogFile.CachedFSUsableSpace cachedFS = new LogFile.CachedFSUsableSpace(fs, 1000L);
Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE);
cachedFS.decrement(Integer.MAX_VALUE);
- Assert.assertEquals(cachedFS.getUsableSpace(),
- Long.MAX_VALUE - Integer.MAX_VALUE);
+ Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - Integer.MAX_VALUE);
try {
cachedFS.decrement(-1);
Assert.fail();
@@ -463,20 +516,22 @@ public class TestLog {
}
when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L);
Thread.sleep(1100);
- Assert.assertEquals(cachedFS.getUsableSpace(),
- Long.MAX_VALUE - 1L);
+ Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - 1L);
}
@Test
public void testCheckpointOnClose() throws Exception {
log.close();
- log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
- MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true)
- .setChannelName("testLog").build();
+ log = new Log.Builder().setCheckpointInterval(1L)
+ .setMaxFileSize(MAX_FILE_SIZE)
+ .setQueueSize(CAPACITY)
+ .setCheckpointDir(checkpointDir)
+ .setLogDirs(dataDirs)
+ .setCheckpointOnClose(true)
+ .setChannelName("testLog")
+ .build();
log.replay();
-
// 1 Write One Event
FlumeEvent eventIn = TestUtils.newPersistableEvent();
log.put(transactionID, eventIn);
@@ -484,20 +539,19 @@ public class TestLog {
// 2 Check state of checkpoint before close
File checkPointMetaFile =
- FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next();
- long before = FileUtils.checksumCRC32( checkPointMetaFile );
+ FileUtils.listFiles(checkpointDir, new String[] { "meta" }, false).iterator().next();
+ long before = FileUtils.checksumCRC32(checkPointMetaFile);
// 3 Close Log
log.close();
// 4 Verify that checkpoint was modified on close
- long after = FileUtils.checksumCRC32( checkPointMetaFile );
- Assert.assertFalse( before == after );
+ long after = FileUtils.checksumCRC32(checkPointMetaFile);
+ Assert.assertFalse(before == after);
}
- private void takeAndVerify(FlumeEventPointer eventPointerIn,
- FlumeEvent eventIn)
- throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+ private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn)
+ throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
FlumeEventQueue queue = log.getFlumeEventQueue();
FlumeEventPointer eventPointerOut = queue.removeHead(0);
Assert.assertNotNull(eventPointerOut);
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 976a112..d945c7f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -18,6 +18,16 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -28,33 +38,21 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
public class TestLogFile {
private int fileID;
private long transactionID;
private LogFile.Writer logFileWriter;
private File dataDir;
private File dataFile;
+
@Before
public void setup() throws IOException {
fileID = 1;
@@ -65,28 +63,30 @@ public class TestLogFile {
logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE, true, 0);
}
+
@After
public void cleanup() throws IOException {
try {
- if(logFileWriter != null) {
+ if (logFileWriter != null) {
logFileWriter.close();
}
} finally {
FileUtils.deleteQuietly(dataDir);
}
}
+
@Test
public void testWriterRefusesToOverwriteFile() throws IOException {
Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
try {
LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
- null, Long.MAX_VALUE, true, 0);
+ null, Long.MAX_VALUE, true, 0);
Assert.fail();
} catch (IllegalStateException e) {
- Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
- e.getMessage());
+ Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage());
}
}
+
@Test
public void testWriterFailsWithDirectory() throws IOException {
FileUtils.deleteQuietly(dataFile);
@@ -94,30 +94,29 @@ public class TestLogFile {
Assert.assertTrue(dataFile.mkdirs());
try {
LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
- null, Long.MAX_VALUE, true, 0);
+ null, Long.MAX_VALUE, true, 0);
Assert.fail();
} catch (IllegalStateException e) {
- Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
- e.getMessage());
+ Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage());
}
}
+
@Test
public void testPutGet() throws InterruptedException, IOException {
final List<Throwable> errors =
Collections.synchronizedList(new ArrayList<Throwable>());
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletionService<Void> completionService = new ExecutorCompletionService
- <Void>(executorService);
- final LogFile.RandomReader logFileReader =
- LogFileFactory.getRandomReader(dataFile, null, true);
+ <Void>(executorService);
+ final LogFile.RandomReader logFileReader = LogFileFactory.getRandomReader(dataFile, null, true);
for (int i = 0; i < 1000; i++) {
// first try and throw failures
synchronized (errors) {
- for(Throwable throwable : errors) {
+ for (Throwable throwable : errors) {
Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
}
// then throw errors
- for(Throwable throwable : errors) {
+ for (Throwable throwable : errors) {
Throwables.propagate(throwable);
}
}
@@ -134,7 +133,7 @@ public class TestLogFile {
FlumeEvent eventOut = logFileReader.get(offset);
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
- } catch(Throwable throwable) {
+ } catch (Throwable throwable) {
synchronized (errors) {
errors.add(throwable);
}
@@ -143,26 +142,26 @@ public class TestLogFile {
}, null);
}
- for(int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 1000; i++) {
completionService.take();
}
// first try and throw failures
- for(Throwable throwable : errors) {
+ for (Throwable throwable : errors) {
Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
}
// then throw errors
- for(Throwable throwable : errors) {
+ for (Throwable throwable : errors) {
Throwables.propagate(throwable);
}
}
+
@Test
public void testReader() throws InterruptedException, IOException,
- CorruptEventException {
+ CorruptEventException {
Map<Integer, Put> puts = Maps.newHashMap();
for (int i = 0; i < 1000; i++) {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
- Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
puts.put(ptr.getOffset(), put);
@@ -170,14 +169,14 @@ public class TestLogFile {
LogFile.SequentialReader reader =
LogFileFactory.getSequentialReader(dataFile, null, true);
LogRecord entry;
- while((entry = reader.next()) != null) {
+ while ((entry = reader.next()) != null) {
Integer offset = entry.getOffset();
TransactionEventRecord record = entry.getEvent();
Put put = puts.get(offset);
FlumeEvent eventIn = put.getEvent();
Assert.assertEquals(put.getTransactionID(), record.getTransactionID());
Assert.assertTrue(record instanceof Put);
- FlumeEvent eventOut = ((Put)record).getEvent();
+ FlumeEvent eventOut = ((Put) record).getEvent();
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
}
@@ -185,12 +184,12 @@ public class TestLogFile {
@Test
public void testReaderOldMetaFile() throws InterruptedException,
- IOException, CorruptEventException {
+ IOException, CorruptEventException {
Map<Integer, Put> puts = Maps.newHashMap();
for (int i = 0; i < 1000; i++) {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
puts.put(ptr.getOffset(), put);
@@ -202,7 +201,7 @@ public class TestLogFile {
Assert.fail("Renaming to meta.old failed");
}
LogFile.SequentialReader reader =
- LogFileFactory.getSequentialReader(dataFile, null, true);
+ LogFileFactory.getSequentialReader(dataFile, null, true);
Assert.assertTrue(metadataFile.exists());
Assert.assertFalse(oldMetadataFile.exists());
LogRecord entry;
@@ -219,14 +218,14 @@ public class TestLogFile {
}
}
- @Test
- public void testReaderTempMetaFile() throws InterruptedException,
- IOException, CorruptEventException {
+ @Test
+ public void testReaderTempMetaFile()
+ throws InterruptedException, IOException, CorruptEventException {
Map<Integer, Put> puts = Maps.newHashMap();
for (int i = 0; i < 1000; i++) {
FlumeEvent eventIn = TestUtils.newPersistableEvent();
Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
puts.put(ptr.getOffset(), put);
@@ -240,7 +239,7 @@ public class TestLogFile {
Assert.fail("Renaming to meta.temp failed");
}
LogFile.SequentialReader reader =
- LogFileFactory.getSequentialReader(dataFile, null, true);
+ LogFileFactory.getSequentialReader(dataFile, null, true);
Assert.assertTrue(metadataFile.exists());
Assert.assertFalse(tempMetadataFile.exists());
Assert.assertFalse(oldMetadataFile.exists());
@@ -257,9 +256,10 @@ public class TestLogFile {
Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
}
}
+
@Test
public void testWriteDelimitedTo() throws IOException {
- if(dataFile.isFile()) {
+ if (dataFile.isFile()) {
Assert.assertTrue(dataFile.delete());
}
Assert.assertTrue(dataFile.createNewFile());
@@ -270,25 +270,24 @@ public class TestLogFile {
metaDataBuilder.setCheckpointPosition(3);
metaDataBuilder.setCheckpointWriteOrderID(4);
LogFileV3.writeDelimitedTo(metaDataBuilder.build(), dataFile);
- ProtosFactory.LogFileMetaData metaData = ProtosFactory.LogFileMetaData.
- parseDelimitedFrom(new FileInputStream(dataFile));
+ ProtosFactory.LogFileMetaData metaData =
+ ProtosFactory.LogFileMetaData.parseDelimitedFrom(new FileInputStream(dataFile));
Assert.assertEquals(1, metaData.getVersion());
Assert.assertEquals(2, metaData.getLogFileID());
Assert.assertEquals(3, metaData.getCheckpointPosition());
Assert.assertEquals(4, metaData.getCheckpointWriteOrderID());
}
- @Test (expected = CorruptEventException.class)
+ @Test(expected = CorruptEventException.class)
public void testPutGetCorruptEvent() throws Exception {
final LogFile.RandomReader logFileReader =
- LogFileFactory.getRandomReader(dataFile, null, true);
+ LogFileFactory.getRandomReader(dataFile, null, true);
final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
- final Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
- logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
- (transactionID, WriteOrderOracle.next())));
+ logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+ new Commit(transactionID, WriteOrderOracle.next())));
logFileWriter.sync();
final int offset = ptr.getOffset();
RandomAccessFile writer = new RandomAccessFile(dataFile, "rw");
@@ -300,24 +299,22 @@ public class TestLogFile {
// Should have thrown an exception by now.
Assert.fail();
-
}
- @Test (expected = NoopRecordException.class)
+ @Test(expected = NoopRecordException.class)
public void testPutGetNoopEvent() throws Exception {
final LogFile.RandomReader logFileReader =
- LogFileFactory.getRandomReader(dataFile, null, true);
+ LogFileFactory.getRandomReader(dataFile, null, true);
final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
- final Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
- logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
- (transactionID, WriteOrderOracle.next())));
+ logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+ new Commit(transactionID, WriteOrderOracle.next())));
logFileWriter.sync();
final int offset = ptr.getOffset();
- LogFile.OperationRecordUpdater updater = new LogFile
- .OperationRecordUpdater(dataFile);
+ LogFile.OperationRecordUpdater updater =
+ new LogFile.OperationRecordUpdater(dataFile);
updater.markRecordAsNoop(offset);
logFileReader.get(offset);
@@ -330,40 +327,38 @@ public class TestLogFile {
File tempDir = Files.createTempDir();
File temp = new File(tempDir, "temp");
final RandomAccessFile tempFile = new RandomAccessFile(temp, "rw");
- for(int i = 0; i < 5000; i++) {
+ for (int i = 0; i < 5000; i++) {
tempFile.write(LogFile.OP_RECORD);
}
tempFile.seek(0);
LogFile.OperationRecordUpdater recordUpdater = new LogFile
- .OperationRecordUpdater(temp);
+ .OperationRecordUpdater(temp);
//Convert every 10th byte into a noop byte
- for(int i = 0; i < 5000; i+=10) {
+ for (int i = 0; i < 5000; i += 10) {
recordUpdater.markRecordAsNoop(i);
}
recordUpdater.close();
tempFile.seek(0);
// Verify every 10th byte is actually a NOOP
- for(int i = 0; i < 5000; i+=10) {
+ for (int i = 0; i < 5000; i += 10) {
tempFile.seek(i);
Assert.assertEquals(LogFile.OP_NOOP, tempFile.readByte());
}
-
}
@Test
- public void testOpRecordUpdaterWithFlumeEvents() throws Exception{
+ public void testOpRecordUpdaterWithFlumeEvents() throws Exception {
final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
- final Put put = new Put(++transactionID, WriteOrderOracle.next(),
- eventIn);
+ final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
FlumeEventPointer ptr = logFileWriter.put(bytes);
- logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
- (transactionID, WriteOrderOracle.next())));
+ logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+ new Commit(transactionID, WriteOrderOracle.next())));
logFileWriter.sync();
final int offset = ptr.getOffset();
- LogFile.OperationRecordUpdater updater = new LogFile
- .OperationRecordUpdater(dataFile);
+ LogFile.OperationRecordUpdater updater =
+ new LogFile.OperationRecordUpdater(dataFile);
updater.markRecordAsNoop(offset);
RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw");
Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte());
@@ -375,7 +370,7 @@ public class TestLogFile {
final CyclicBarrier barrier = new CyclicBarrier(20);
ExecutorService executorService = Executors.newFixedThreadPool(20);
ExecutorCompletionService<Void> completionService = new
- ExecutorCompletionService<Void>(executorService);
+ ExecutorCompletionService<Void>(executorService);
final LogFile.Writer writer = logFileWriter;
final AtomicLong txnId = new AtomicLong(++transactionID);
for (int i = 0; i < 20; i++) {
@@ -384,11 +379,11 @@ public class TestLogFile {
public Void call() {
try {
Put put = new Put(txnId.incrementAndGet(),
- WriteOrderOracle.next(), eventIn);
+ WriteOrderOracle.next(), eventIn);
ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
writer.put(bytes);
writer.commit(TransactionEventRecord.toByteBuffer(
- new Commit(txnId.get(), WriteOrderOracle.next())));
+ new Commit(txnId.get(), WriteOrderOracle.next())));
barrier.await();
writer.sync();
} catch (Exception ex) {
@@ -399,17 +394,15 @@ public class TestLogFile {
});
}
- for(int i = 0; i < 20; i++) {
+ for (int i = 0; i < 20; i++) {
completionService.take().get();
}
- //At least 250*20, but can be higher due to serialization overhead
+ // At least 250*20, but can be higher due to serialization overhead
Assert.assertTrue(logFileWriter.position() >= 5000);
Assert.assertEquals(1, writer.getSyncCount());
- Assert.assertTrue(logFileWriter.getLastCommitPosition() ==
- logFileWriter.getLastSyncPosition());
+ Assert.assertTrue(logFileWriter.getLastCommitPosition() == logFileWriter.getLastSyncPosition());
executorService.shutdown();
-
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
index 2356d90..1f07e1f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
@@ -18,7 +18,8 @@
*/
package org.apache.flume.channel.file;
-import static org.mockito.Mockito.*;
+import junit.framework.Assert;
+import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -30,9 +31,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
-import junit.framework.Assert;
-
-import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
public class TestTransactionEventRecordV2 {
@@ -127,7 +127,7 @@ public class TestTransactionEventRecordV2 {
try {
TransactionEventRecord.fromDataInputV2(toDataInput(in));
Assert.fail();
- } catch(NullPointerException e) {
+ } catch (NullPointerException e) {
Assert.assertEquals("Unknown action ffff8000", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
index eb0ce04..512d290 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -18,7 +18,8 @@
*/
package org.apache.flume.channel.file;
-import static org.mockito.Mockito.*;
+import junit.framework.Assert;
+import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -26,9 +27,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import junit.framework.Assert;
-
-import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestTransactionEventRecordV3 {
@@ -52,6 +52,7 @@ public class TestTransactionEventRecordV3 {
Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(),
commit.getRecordType());
}
+
@Test
public void testPutSerialization() throws IOException, CorruptEventException {
Map<String, String> headers = new HashMap<String, String>();
@@ -69,9 +70,9 @@ public class TestTransactionEventRecordV3 {
Assert.assertEquals(headers, out.getEvent().getHeaders());
Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
}
+
@Test
- public void testPutSerializationNullHeader() throws IOException,
- CorruptEventException {
+ public void testPutSerializationNullHeader() throws IOException, CorruptEventException {
Put in = new Put(System.currentTimeMillis(),
WriteOrderOracle.next(),
new FlumeEvent(null, new byte[0]));
@@ -84,11 +85,10 @@ public class TestTransactionEventRecordV3 {
Assert.assertNotNull(out.getEvent().getHeaders());
Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
}
+
@Test
- public void testTakeSerialization() throws IOException,
- CorruptEventException {
- Take in = new Take(System.currentTimeMillis(),
- WriteOrderOracle.next(), 10, 20);
+ public void testTakeSerialization() throws IOException, CorruptEventException {
+ Take in = new Take(System.currentTimeMillis(), WriteOrderOracle.next(), 10, 20);
Take out = (Take)TransactionEventRecord.fromByteArray(toByteArray(in));
Assert.assertEquals(in.getClass(), out.getClass());
Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -99,10 +99,8 @@ public class TestTransactionEventRecordV3 {
}
@Test
- public void testRollbackSerialization() throws IOException,
- CorruptEventException {
- Rollback in = new Rollback(System.currentTimeMillis(),
- WriteOrderOracle.next());
+ public void testRollbackSerialization() throws IOException, CorruptEventException {
+ Rollback in = new Rollback(System.currentTimeMillis(), WriteOrderOracle.next());
Rollback out = (Rollback)TransactionEventRecord.fromByteArray(toByteArray(in));
Assert.assertEquals(in.getClass(), out.getClass());
Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -111,10 +109,8 @@ public class TestTransactionEventRecordV3 {
}
@Test
- public void testCommitSerialization() throws IOException,
- CorruptEventException {
- Commit in = new Commit(System.currentTimeMillis(),
- WriteOrderOracle.next());
+ public void testCommitSerialization() throws IOException, CorruptEventException {
+ Commit in = new Commit(System.currentTimeMillis(), WriteOrderOracle.next());
Commit out = (Commit)TransactionEventRecord.fromByteArray(toByteArray(in));
Assert.assertEquals(in.getClass(), out.getClass());
Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -129,7 +125,7 @@ public class TestTransactionEventRecordV3 {
try {
TransactionEventRecord.fromByteArray(toByteArray(in));
Assert.fail();
- } catch(NullPointerException e) {
+ } catch (NullPointerException e) {
Assert.assertEquals("Unknown action ffff8000", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 61f38d2..0ec1831 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -18,7 +18,21 @@
*/
package org.apache.flume.channel.file;
-import static org.fest.reflect.core.Reflection.*;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -36,22 +50,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Assert;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.io.Resources;
+import static org.fest.reflect.core.Reflection.field;
+import static org.fest.reflect.core.Reflection.method;
public class TestUtils {
@@ -119,7 +119,7 @@ public class TestUtils {
public static List<File> getAllLogs(File[] dataDirs) {
List<File> result = Lists.newArrayList();
- for(File dataDir : dataDirs) {
+ for (File dataDir : dataDirs) {
result.addAll(LogUtils.getLogs(dataDir));
}
return result;
@@ -139,24 +139,22 @@ public class TestUtils {
.invoke(true));
}
- public static Set<String> takeEvents(Channel channel, int batchSize)
- throws Exception {
+ public static Set<String> takeEvents(Channel channel, int batchSize) throws Exception {
return takeEvents(channel, batchSize, false);
}
- public static Set<String> takeEvents(Channel channel,
- int batchSize, boolean checkForCorruption) throws Exception {
+ public static Set<String> takeEvents(Channel channel, int batchSize, boolean checkForCorruption)
+ throws Exception {
return takeEvents(channel, batchSize, Integer.MAX_VALUE, checkForCorruption);
}
- public static Set<String> takeEvents(Channel channel,
- int batchSize, int numEvents) throws Exception {
+ public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents)
+ throws Exception {
return takeEvents(channel, batchSize, numEvents, false);
}
- public static Set<String> takeEvents(Channel channel,
- int batchSize, int numEvents, boolean checkForCorruption) throws
- Exception {
+ public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents,
+ boolean checkForCorruption) throws Exception {
Set<String> result = Sets.newHashSet();
for (int i = 0; i < numEvents; i += batchSize) {
Transaction transaction = channel.getTransaction();
@@ -169,16 +167,15 @@ public class TestUtils {
} catch (ChannelException ex) {
Throwable th = ex;
String msg;
- if(checkForCorruption) {
+ if (checkForCorruption) {
msg = "Corrupt event found. Please run File Channel";
th = ex.getCause();
} else {
msg = "Take list for FileBackedTransaction, capacity";
}
- Assert.assertTrue(th.getMessage().startsWith(
- msg));
- if(checkForCorruption) {
- throw (Exception) th;
+ Assert.assertTrue(th.getMessage().startsWith(msg));
+ if (checkForCorruption) {
+ throw (Exception)th;
}
transaction.commit();
return result;
@@ -204,16 +201,16 @@ public class TestUtils {
public static Set<String> consumeChannel(Channel channel) throws Exception {
return consumeChannel(channel, false);
}
- public static Set<String> consumeChannel(Channel channel,
- boolean checkForCorruption) throws Exception {
+ public static Set<String> consumeChannel(Channel channel, boolean checkForCorruption)
+ throws Exception {
Set<String> result = Sets.newHashSet();
int[] batchSizes = new int[] {
1000, 100, 10, 1
};
for (int i = 0; i < batchSizes.length; i++) {
- while(true) {
+ while (true) {
Set<String> batch = takeEvents(channel, batchSizes[i], checkForCorruption);
- if(batch.isEmpty()) {
+ if (batch.isEmpty()) {
break;
}
result.addAll(batch);
@@ -221,18 +218,16 @@ public class TestUtils {
}
return result;
}
- public static Set<String> fillChannel(Channel channel, String prefix)
- throws Exception {
+ public static Set<String> fillChannel(Channel channel, String prefix) throws Exception {
Set<String> result = Sets.newHashSet();
int[] batchSizes = new int[] {
1000, 100, 10, 1
};
for (int i = 0; i < batchSizes.length; i++) {
try {
- while(true) {
- Set<String> batch = putEvents(channel, prefix, batchSizes[i],
- Integer.MAX_VALUE, true);
- if(batch.isEmpty()) {
+ while (true) {
+ Set<String> batch = putEvents(channel, prefix, batchSizes[i], Integer.MAX_VALUE, true);
+ if (batch.isEmpty()) {
break;
}
result.addAll(batch);
@@ -243,19 +238,17 @@ public class TestUtils {
+ "size, a downstream system running slower than normal, or that "
+ "the channel capacity is just too low. [channel="
+ channel.getName() + "]").equals(e.getMessage())
- || e.getMessage().startsWith("Put queue for FileBackedTransaction " +
- "of capacity "));
+ || e.getMessage().startsWith("Put queue for FileBackedTransaction of capacity "));
}
}
return result;
}
- public static Set<String> putEvents(Channel channel, String prefix,
- int batchSize, int numEvents) throws Exception {
+ public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents)
+ throws Exception {
return putEvents(channel, prefix, batchSize, numEvents, false);
}
- public static Set<String> putEvents(Channel channel, String prefix,
- int batchSize, int numEvents, boolean untilCapacityIsReached)
- throws Exception {
+ public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents,
+ boolean untilCapacityIsReached) throws Exception {
Set<String> result = Sets.newHashSet();
for (int i = 0; i < numEvents; i += batchSize) {
Transaction transaction = channel.getTransaction();
@@ -272,13 +265,12 @@ public class TestUtils {
result.addAll(batch);
} catch (Exception ex) {
transaction.rollback();
- if(untilCapacityIsReached && ex instanceof ChannelException &&
+ if (untilCapacityIsReached && ex instanceof ChannelException &&
("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
- + "[channel=" +channel.getName() + "]").
- equals(ex.getMessage())) {
+ + "[channel=" + channel.getName() + "]").equals(ex.getMessage())) {
break;
}
throw ex;
@@ -288,6 +280,7 @@ public class TestUtils {
}
return result;
}
+
public static void copyDecompressed(String resource, File output)
throws IOException {
URL input = Resources.getResource(resource);
@@ -298,12 +291,11 @@ public class TestUtils {
gzis.close();
}
- public static Context createFileChannelContext(String checkpointDir,
- String dataDir, String backupDir, Map<String, String> overrides) {
+ public static Context createFileChannelContext(String checkpointDir, String dataDir,
+ String backupDir, Map<String, String> overrides) {
Context context = new Context();
- context.put(FileChannelConfiguration.CHECKPOINT_DIR,
- checkpointDir);
- if(backupDir != null) {
+ context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir);
+ if (backupDir != null) {
context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir);
}
context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
@@ -312,22 +304,22 @@ public class TestUtils {
context.putAll(overrides);
return context;
}
- public static FileChannel createFileChannel(String checkpointDir,
- String dataDir, Map<String, String> overrides) {
+
+ public static FileChannel createFileChannel(String checkpointDir, String dataDir,
+ Map<String, String> overrides) {
return createFileChannel(checkpointDir, dataDir, null, overrides);
}
- public static FileChannel createFileChannel(String checkpointDir,
- String dataDir, String backupDir, Map<String, String> overrides) {
+ public static FileChannel createFileChannel(String checkpointDir, String dataDir,
+ String backupDir, Map<String, String> overrides) {
FileChannel channel = new FileChannel();
channel.setName("FileChannel-" + UUID.randomUUID());
- Context context = createFileChannelContext(checkpointDir, dataDir,
- backupDir, overrides);
+ Context context = createFileChannelContext(checkpointDir, dataDir, backupDir, overrides);
Configurables.configure(channel, context);
return channel;
}
- public static File writeStringToFile(File baseDir, String name,
- String text) throws IOException {
+
+ public static File writeStringToFile(File baseDir, String name, String text) throws IOException {
File passwordFile = new File(baseDir, name);
Files.write(text, passwordFile, Charsets.UTF_8);
return passwordFile;
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
index 530ccf6..22848d2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
@@ -32,24 +32,28 @@ public class CipherProviderTestSuite {
this.encryptor = encryptor;
this.decryptor = decryptor;
}
+
public void test() throws Exception {
testBasic();
testEmpty();
testNullPlainText();
testNullCipherText();
}
+
public void testBasic() throws Exception {
String expected = "mn state fair is the place to be";
byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8));
byte[] clearText = decryptor.decrypt(cipherText);
Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8));
}
+
public void testEmpty() throws Exception {
String expected = "";
byte[] cipherText = encryptor.encrypt(new byte[]{});
byte[] clearText = decryptor.decrypt(cipherText);
Assert.assertEquals(expected, new String(clearText));
}
+
public void testNullPlainText() throws Exception {
try {
encryptor.encrypt(null);
@@ -58,6 +62,7 @@ public class CipherProviderTestSuite {
// expected
}
}
+
public void testNullCipherText() throws Exception {
try {
decryptor.decrypt(null);