You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/08 22:50:55 UTC
[3/9] flume git commit: FLUME-2941. Integrate checkstyle for test
classes
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
index 70d2c1b..724f093 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
@@ -67,8 +67,7 @@ public class ThriftTestingSource {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
- flumeEvents.add(EventBuilder.withBody(event.getBody(),
- event.getHeaders()));
+ flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
individualCount++;
return Status.OK;
}
@@ -80,8 +79,7 @@ public class ThriftTestingSource {
incompleteBatches++;
}
for (ThriftFlumeEvent event : events) {
- flumeEvents.add(EventBuilder.withBody(event.getBody(),
- event.getHeaders()));
+ flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
}
return Status.OK;
}
@@ -175,8 +173,7 @@ public class ThriftTestingSource {
}
@Override
- public Status appendBatch(List<ThriftFlumeEvent> events) throws
- TException {
+ public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
try {
if (delay != null) {
TimeUnit.MILLISECONDS.sleep(delay.get());
@@ -207,8 +204,8 @@ public class ThriftTestingSource {
}
public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception {
- TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new
- InetSocketAddress("0.0.0.0", port));
+ TNonblockingServerTransport serverTransport =
+ new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port));
ThriftSourceProtocol.Iface handler = getHandler(handlerName);
TProtocolFactory transportProtocolFactory = null;
@@ -217,10 +214,9 @@ public class ThriftTestingSource {
} else {
transportProtocolFactory = new TCompactProtocol.Factory();
}
- server = new THsHaServer(new THsHaServer.Args
- (serverTransport).processor(
- new ThriftSourceProtocol.Processor(handler)).protocolFactory(
- transportProtocolFactory));
+ server = new THsHaServer(new THsHaServer.Args(serverTransport).processor(
+ new ThriftSourceProtocol.Processor(handler)).protocolFactory(
+ transportProtocolFactory));
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
@@ -260,10 +256,8 @@ public class ThriftTestingSource {
args.protocolFactory(transportProtocolFactory);
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
- args.processor(new ThriftSourceProtocol
- .Processor<ThriftSourceProtocol.Iface>(handler));
- server = (TServer) serverClass.getConstructor(argsClass).newInstance
- (args);
+ args.processor(new ThriftSourceProtocol.Processor<ThriftSourceProtocol.Iface>(handler));
+ server = (TServer) serverClass.getConstructor(argsClass).newInstance(args);
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
@@ -285,5 +279,4 @@ public class ThriftTestingSource {
server.stop();
}
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index 621920d..3709577 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -18,27 +18,12 @@
package org.apache.flume.sink.kite;
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -57,6 +42,8 @@ import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.sink.kite.parser.EntityParser;
+import org.apache.flume.sink.kite.policy.FailurePolicy;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -74,7 +61,28 @@ import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.PartitionStrategy;
import org.kitesdk.data.View;
-import static org.mockito.Mockito.*;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestDatasetSink {
@@ -206,7 +214,8 @@ public class TestDatasetSink {
}
@Test
- public void testFileStore() throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException {
+ public void testFileStore()
+ throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException {
DatasetSink sink = sink(in, config);
// run the sink
@@ -307,31 +316,28 @@ public class TestDatasetSink {
sink.process();
sink.stop();
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
+ Assert.assertEquals(Sets.newHashSet(expected), read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@Test
public void testDatasetUpdate() throws EventDeliveryException {
// add an updated record that is missing the msg field
- GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(
- UPDATED_SCHEMA);
+ GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(UPDATED_SCHEMA);
GenericData.Record updatedRecord = updatedBuilder
- .set("id", "0")
- .set("priority", 1)
- .set("msg", "Priority 1 message!")
- .build();
+ .set("id", "0")
+ .set("priority", 1)
+ .set("msg", "Priority 1 message!")
+ .build();
// make a set of the expected records with the new schema
Set<GenericRecord> expectedAsUpdated = Sets.newHashSet();
for (GenericRecord record : expected) {
expectedAsUpdated.add(updatedBuilder
- .clear("priority")
- .set("id", record.get("id"))
- .set("msg", record.get("msg"))
- .build());
+ .clear("priority")
+ .set("id", record.get("id"))
+ .set("msg", record.get("msg"))
+ .build());
}
expectedAsUpdated.add(updatedRecord);
@@ -343,9 +349,9 @@ public class TestDatasetSink {
// update the dataset's schema
DatasetDescriptor updated = new DatasetDescriptor
- .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
- .schema(UPDATED_SCHEMA)
- .build();
+ .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
+ .schema(UPDATED_SCHEMA)
+ .build();
Datasets.update(FILE_DATASET_URI, updated);
// trigger a roll on the next process call to refresh the writer
@@ -358,15 +364,12 @@ public class TestDatasetSink {
sink.process();
sink.stop();
- Assert.assertEquals(
- expectedAsUpdated,
- read(Datasets.load(FILE_DATASET_URI)));
+ Assert.assertEquals(expectedAsUpdated, read(Datasets.load(FILE_DATASET_URI)));
Assert.assertEquals("Should have committed", 0, remaining(in));
}
@Test
- public void testMiniClusterStore()
- throws EventDeliveryException, IOException {
+ public void testMiniClusterStore() throws EventDeliveryException, IOException {
// setup a minicluster
MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration())
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
index 9c1cd09..f1dadf1 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
@@ -19,25 +19,27 @@
package org.apache.flume.sink.hdfs;
-import java.io.IOException;
-
import org.apache.flume.Event;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
+import java.io.IOException;
+
public class HDFSTestSeqWriter extends HDFSSequenceFile {
- protected volatile boolean closed, opened;
+ protected volatile boolean closed;
+ protected volatile boolean opened;
private int openCount = 0;
+
HDFSTestSeqWriter(int openCount) {
this.openCount = openCount;
}
@Override
- public void open(String filePath, CompressionCodec codeC,
- CompressionType compType) throws IOException {
+ public void open(String filePath, CompressionCodec codeC, CompressionType compType)
+ throws IOException {
super.open(filePath, codeC, compType);
- if(closed) {
+ if (closed) {
opened = true;
}
}
@@ -52,7 +54,7 @@ public class HDFSTestSeqWriter extends HDFSSequenceFile {
throw new IOException("Injected fault");
} else if (e.getHeaders().containsKey("fault-until-reopen")) {
// opening first time.
- if(openCount == 1) {
+ if (openCount == 1) {
throw new IOException("Injected fault-until-reopen");
}
} else if (e.getHeaders().containsKey("slow")) {
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
index f0c6e7e..a85a99f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
@@ -30,9 +30,9 @@ class MockDataStream extends HDFSDataStream {
MockDataStream(FileSystem fs) {
this.fs = fs;
}
+
@Override
- protected FileSystem getDfs(Configuration conf,
- Path dstPath) throws IOException{
+ protected FileSystem getDfs(Configuration conf, Path dstPath) throws IOException {
return fs;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
index 4443335..a079b83 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
@@ -42,8 +42,7 @@ public class MockFileSystem extends FileSystem {
int currentRenameAttempts;
boolean closeSucceed = true;
- public MockFileSystem(FileSystem fs,
- int numberOfRetriesRequired) {
+ public MockFileSystem(FileSystem fs, int numberOfRetriesRequired) {
this.fs = fs;
this.numberOfRetriesRequired = numberOfRetriesRequired;
}
@@ -67,17 +66,14 @@ public class MockFileSystem extends FileSystem {
@Override
public FSDataOutputStream create(Path arg0) throws IOException {
- //throw new IOException ("HI there2");
- latestOutputStream = new MockFsDataOutputStream(
- fs.create(arg0), closeSucceed);
-
+ latestOutputStream = new MockFsDataOutputStream(fs.create(arg0), closeSucceed);
return latestOutputStream;
}
@Override
- public FSDataOutputStream create(Path arg0, FsPermission arg1,
- boolean arg2, int arg3, short arg4, long arg5, Progressable arg6)
- throws IOException {
+ public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3,
+ short arg4, long arg5, Progressable arg6)
+ throws IOException {
throw new IOException("Not a real file system");
}
@@ -126,11 +122,9 @@ public class MockFileSystem extends FileSystem {
@Override
public boolean rename(Path arg0, Path arg1) throws IOException {
currentRenameAttempts++;
- logger.info(
- "Attempting to Rename: '" + currentRenameAttempts + "' of '" +
- numberOfRetriesRequired + "'");
- if (currentRenameAttempts >= numberOfRetriesRequired ||
- numberOfRetriesRequired == 0) {
+ logger.info("Attempting to Rename: '" + currentRenameAttempts + "' of '" +
+ numberOfRetriesRequired + "'");
+ if (currentRenameAttempts >= numberOfRetriesRequired || numberOfRetriesRequired == 0) {
logger.info("Renaming file");
return fs.rename(arg0, arg1);
} else {
@@ -141,6 +135,5 @@ public class MockFileSystem extends FileSystem {
@Override
public void setWorkingDirectory(Path arg0) {
fs.setWorkingDirectory(arg0);
-
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
index 35b034e..f5d579c 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
@@ -17,21 +17,20 @@
+ */
package org.apache.flume.sink.hdfs;
-import java.io.IOException;
-
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MockFsDataOutputStream extends FSDataOutputStream{
+import java.io.IOException;
+
+public class MockFsDataOutputStream extends FSDataOutputStream {
private static final Logger logger =
LoggerFactory.getLogger(MockFsDataOutputStream.class);
boolean closeSucceed;
- public MockFsDataOutputStream(FSDataOutputStream wrapMe,
- boolean closeSucceed)
+ public MockFsDataOutputStream(FSDataOutputStream wrapMe, boolean closeSucceed)
throws IOException {
super(wrapMe.getWrappedStream(), null);
this.closeSucceed = closeSucceed;
@@ -39,8 +38,7 @@ public class MockFsDataOutputStream extends FSDataOutputStream{
@Override
public void close() throws IOException {
- logger.info(
- "Close Succeeded - " + closeSucceed);
+ logger.info("Close Succeeded - " + closeSucceed);
if (closeSucceed) {
logger.info("closing file");
super.close();
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index ec49b97..05c4316 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -68,7 +68,8 @@ public class MockHDFSWriter implements HDFSWriter {
filesOpened++;
}
- public void open(String filePath, CompressionCodec codec, CompressionType cType) throws IOException {
+ public void open(String filePath, CompressionCodec codec, CompressionType cType)
+ throws IOException {
this.filePath = filePath;
filesOpened++;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 2581f73..742deb0 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -18,14 +18,7 @@
*/
package org.apache.flume.sink.hdfs;
-import java.io.File;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.base.Charsets;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -46,12 +39,17 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TestBucketWriter {
- private static Logger logger =
- LoggerFactory.getLogger(TestBucketWriter.class);
+ private static Logger logger = LoggerFactory.getLogger(TestBucketWriter.class);
private Context ctx = new Context();
private static ScheduledExecutorService timedRollerPool;
@@ -74,11 +72,11 @@ public class TestBucketWriter {
public void testEventCountingRoller() throws IOException, InterruptedException {
int maxEvents = 100;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
- "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, proxy,
- new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
- null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ 0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -98,12 +96,11 @@ public class TestBucketWriter {
public void testSizeRoller() throws IOException, InterruptedException {
int maxBytes = 300;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0,
- ctx, "/tmp", "file", "", ".tmp", null, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool,
- proxy, new SinkCounter("test-bucket-writer-" +
- System.currentTimeMillis()),0, null, null, 30000,
- Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ 0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -126,16 +123,16 @@ public class TestBucketWriter {
final AtomicBoolean calledBack = new AtomicBoolean(false);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
- "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, proxy,
- new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, new HDFSEventSink.WriterCallback() {
- @Override
- public void run(String filePath) {
- calledBack.set(true);
- }
- }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+ new HDFSEventSink.WriterCallback() {
+ @Override
+ public void run(String filePath) {
+ calledBack.set(true);
+ }
+ }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
long startNanos = System.nanoTime();
@@ -149,12 +146,11 @@ public class TestBucketWriter {
Assert.assertTrue(bucketWriter.closed);
Assert.assertTrue(calledBack.get());
- bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
- "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, proxy,
- new SinkCounter("test-bucket-writer-"
- + System.currentTimeMillis()), 0, null, null, 30000,
- Executors.newSingleThreadExecutor(), 0, 0);
+ bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
// write one more event (to reopen a new file so we will roll again later)
bucketWriter.append(e);
@@ -193,17 +189,16 @@ public class TestBucketWriter {
private volatile boolean open = false;
public void configure(Context context) {
-
}
public void sync() throws IOException {
- if(!open) {
+ if (!open) {
throw new IOException("closed");
}
}
- public void open(String filePath, CompressionCodec codec,
- CompressionType cType) throws IOException {
+ public void open(String filePath, CompressionCodec codec, CompressionType cType)
+ throws IOException {
open = true;
}
@@ -225,19 +220,18 @@ public class TestBucketWriter {
open = true;
}
};
+
HDFSTextSerializer serializer = new HDFSTextSerializer();
File tmpFile = File.createTempFile("flume", "test");
tmpFile.deleteOnExit();
String path = tmpFile.getParent();
String name = tmpFile.getName();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
- 0, ctx, path, name, "", ".tmp", null, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
- + System.currentTimeMillis()),
- 0, null, null, 30000, Executors.newSingleThreadExecutor(),
- 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -252,62 +246,61 @@ public class TestBucketWriter {
@Test
public void testFileSuffixNotGiven() throws IOException, InterruptedException {
- final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
- final String suffix = null;
-
- MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
- 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
- + System.currentTimeMillis()), 0, null, null, 30000,
+ final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+ final String suffix = null;
+
+ MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
- // Need to override system time use for test so we know what to expect
- final long testTime = System.currentTimeMillis();
- Clock testClock = new Clock() {
- public long currentTimeMillis() {
- return testTime;
- }
- };
- bucketWriter.setClock(testClock);
+ // Need to override system time use for test so we know what to expect
+ final long testTime = System.currentTimeMillis();
+ Clock testClock = new Clock() {
+ public long currentTimeMillis() {
+ return testTime;
+ }
+ };
+ bucketWriter.setClock(testClock);
- Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
- bucketWriter.append(e);
+ Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+ bucketWriter.append(e);
- Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + ".tmp"));
+ Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
+ Long.toString(testTime + 1) + ".tmp"));
}
- @Test
- public void testFileSuffixGiven() throws IOException, InterruptedException {
- final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
- final String suffix = ".avro";
+ @Test
+ public void testFileSuffixGiven() throws IOException, InterruptedException {
+ final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+ final String suffix = ".avro";
- MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
- 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter(
- "test-bucket-writer-" + System.currentTimeMillis()), 0,
- null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
- // Need to override system time use for test so we know what to expect
+ // Need to override system time use for test so we know what to expect
- final long testTime = System.currentTimeMillis();
+ final long testTime = System.currentTimeMillis();
- Clock testClock = new Clock() {
- public long currentTimeMillis() {
- return testTime;
- }
- };
- bucketWriter.setClock(testClock);
+ Clock testClock = new Clock() {
+ public long currentTimeMillis() {
+ return testTime;
+ }
+ };
+ bucketWriter.setClock(testClock);
- Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
- bucketWriter.append(e);
+ Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+ bucketWriter.append(e);
- Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
- Long.toString(testTime + 1) + suffix + ".tmp"));
- }
+ Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath().endsWith(
+ Long.toString(testTime + 1) + suffix + ".tmp"));
+ }
@Test
public void testFileSuffixCompressed()
@@ -316,13 +309,11 @@ public class TestBucketWriter {
final String suffix = ".foo";
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
- 0, ctx, "/tmp", "file", "", ".tmp", suffix,
- HDFSEventSink.getCodec("gzip"),
- SequenceFile.CompressionType.BLOCK, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
- + System.currentTimeMillis()), 0, null, null, 30000,
- Executors.newSingleThreadExecutor(), 0, 0
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix,
+ HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter,
+ timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+ 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0
);
// Need to override system time use for test so we know what to expect
@@ -338,8 +329,8 @@ public class TestBucketWriter {
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
- Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath()
- .endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
+ Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
+ Long.toString(testTime + 1) + suffix + ".tmp"));
}
@Test
@@ -349,12 +340,11 @@ public class TestBucketWriter {
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer formatter = new HDFSTextSerializer();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
- 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter(
- "test-bucket-writer-" + System.currentTimeMillis()), 0,
- null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
@@ -369,12 +359,11 @@ public class TestBucketWriter {
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer serializer = new HDFSTextSerializer();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
- 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
- SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, proxy, new SinkCounter(
- "test-bucket-writer-" + System.currentTimeMillis()), 0,
- null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
@@ -389,18 +378,16 @@ public class TestBucketWriter {
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
- BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
- 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
- SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, proxy,
- new SinkCounter(
- "test-bucket-writer-" + System.currentTimeMillis()), 0,
- new HDFSEventSink.WriterCallback() {
- @Override
- public void run(String filePath) {
- callbackCalled.set(true);
- }
- }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
+ BucketWriter bucketWriter = new BucketWriter(
+ ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+ SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+ new HDFSEventSink.WriterCallback() {
+ @Override
+ public void run(String filePath) {
+ callbackCalled.set(true);
+ }
+ }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
@@ -420,13 +407,13 @@ public class TestBucketWriter {
SequenceFileRenameRetryCoreTest(1, false);
SequenceFileRenameRetryCoreTest(5, false);
SequenceFileRenameRetryCoreTest(2, false);
-
}
- public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed) throws Exception {
- String hdfsPath = "file:///tmp/flume-test."
- + Calendar.getInstance().getTimeInMillis() + "."
- + Thread.currentThread().getId();
+ public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed)
+ throws Exception {
+ String hdfsPath = "file:///tmp/flume-test." +
+ Calendar.getInstance().getTimeInMillis() +
+ "." + Thread.currentThread().getId();
Context context = new Context();
Configuration conf = new Configuration();
@@ -435,22 +422,16 @@ public class TestBucketWriter {
fs.delete(dirPath, true);
fs.mkdirs(dirPath);
context.put("hdfs.path", hdfsPath);
- context.put("hdfs.closeTries",
- String.valueOf(numberOfRetriesRequired));
+ context.put("hdfs.closeTries", String.valueOf(numberOfRetriesRequired));
context.put("hdfs.rollCount", "1");
context.put("hdfs.retryInterval", "1");
context.put("hdfs.callTimeout", Long.toString(1000));
- MockFileSystem mockFs = new
- MockFileSystem(fs,
- numberOfRetriesRequired, closeSucceed);
- BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
- hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
- null, new MockDataStream(mockFs),
- timedRollerPool, proxy,
- new SinkCounter(
- "test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,
- numberOfRetriesRequired);
+ MockFileSystem mockFs = new MockFileSystem(fs, numberOfRetriesRequired, closeSucceed);
+ BucketWriter bucketWriter = new BucketWriter(
+ 0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
+ null, new MockDataStream(mockFs), timedRollerPool, proxy,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+ Executors.newSingleThreadExecutor(), 1, numberOfRetriesRequired);
bucketWriter.setFileSystem(mockFs);
// At this point, we checked if isFileClosed is available in
@@ -463,8 +444,7 @@ public class TestBucketWriter {
TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2);
Assert.assertTrue("Expected " + numberOfRetriesRequired + " " +
- "but got " + bucketWriter.renameTries.get(),
- bucketWriter.renameTries.get() ==
- numberOfRetriesRequired);
+ "but got " + bucketWriter.renameTries.get(),
+ bucketWriter.renameTries.get() == numberOfRetriesRequired);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 23862eb..73f016b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-
public class TestHDFSEventSink {
private HDFSEventSink sink;
@@ -118,8 +117,7 @@ public class TestHDFSEventSink {
@After
public void tearDown() {
- if (System.getenv("hdfs_keepFiles") == null)
- dirCleanup();
+ if (System.getenv("hdfs_keepFiles") == null) dirCleanup();
}
@Test
@@ -176,7 +174,7 @@ public class TestHDFSEventSink {
List<String> bodies = Lists.newArrayList();
// push the event batches into channel to roll twice
- for (i = 1; i <= (rollCount*10)/batchSize; i++) {
+ for (i = 1; i <= (rollCount * 10) / batchSize; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= batchSize; j++) {
@@ -200,7 +198,7 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
@@ -353,7 +351,7 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
@@ -432,7 +430,7 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
@@ -508,7 +506,7 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
@@ -519,8 +517,8 @@ public class TestHDFSEventSink {
}
@Test
- public void testSimpleAppendLocalTime() throws InterruptedException,
- LifecycleException, EventDeliveryException, IOException {
+ public void testSimpleAppendLocalTime()
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
final long currentTime = System.currentTimeMillis();
Clock clk = new Clock() {
@Override
@@ -536,7 +534,7 @@ public class TestHDFSEventSink {
final int numBatches = 4;
String newPath = testPath + "/singleBucket/%s" ;
String expectedPath = testPath + "/singleBucket/" +
- String.valueOf(currentTime/1000);
+ String.valueOf(currentTime / 1000);
int totalEvents = 0;
int i = 1, j = 1;
@@ -576,7 +574,7 @@ public class TestHDFSEventSink {
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
event.getHeaders().put("timestamp",
- String.valueOf(eventDate.getTimeInMillis()));
+ String.valueOf(eventDate.getTimeInMillis()));
event.getHeaders().put("hostname", "Host" + i);
String body = "Test." + i + "." + j;
event.setBody(body.getBytes());
@@ -595,13 +593,13 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
if (totalEvents % rollCount > 0) expectedFiles++;
Assert.assertEquals("num files wrong, found: " +
- Lists.newArrayList(fList), expectedFiles, fList.length);
+ Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
// The clock in bucketpath is static, so restore the real clock
sink.setBucketClock(new SystemClock());
@@ -750,10 +748,10 @@ public class TestHDFSEventSink {
private List<String> getAllFiles(String input) {
List<String> output = Lists.newArrayList();
File dir = new File(input);
- if(dir.isFile()) {
+ if (dir.isFile()) {
output.add(dir.getAbsolutePath());
- } else if(dir.isDirectory()) {
- for(String file : dir.list()) {
+ } else if (dir.isDirectory()) {
+ for (String file : dir.list()) {
File subDir = new File(dir, file);
output.addAll(getAllFiles(subDir.getAbsolutePath()));
}
@@ -761,16 +759,17 @@ public class TestHDFSEventSink {
return output;
}
- private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+ private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir,
+ String prefix, List<String> bodies) throws IOException {
int found = 0;
int expected = bodies.size();
- for(String outputFile : getAllFiles(dir)) {
+ for (String outputFile : getAllFiles(dir)) {
String name = (new File(outputFile)).getName();
- if(name.startsWith(prefix)) {
+ if (name.startsWith(prefix)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(outputFile), conf);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
- while(reader.next(key, value)) {
+ while (reader.next(key, value)) {
String body = new String(value.getBytes(), 0, value.getLength());
if (bodies.contains(body)) {
LOG.debug("Found event body: {}", body);
@@ -792,16 +791,17 @@ public class TestHDFSEventSink {
}
- private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+ private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix,
+ List<String> bodies) throws IOException {
int found = 0;
int expected = bodies.size();
- for(String outputFile : getAllFiles(dir)) {
+ for (String outputFile : getAllFiles(dir)) {
String name = (new File(outputFile)).getName();
- if(name.startsWith(prefix)) {
+ if (name.startsWith(prefix)) {
FSDataInputStream input = fs.open(new Path(outputFile));
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String body = null;
- while((body = reader.readLine()) != null) {
+ while ((body = reader.readLine()) != null) {
bodies.remove(body);
found++;
}
@@ -814,12 +814,13 @@ public class TestHDFSEventSink {
}
- private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException {
+ private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix,
+ List<String> bodies) throws IOException {
int found = 0;
int expected = bodies.size();
- for(String outputFile : getAllFiles(dir)) {
+ for (String outputFile : getAllFiles(dir)) {
String name = (new File(outputFile)).getName();
- if(name.startsWith(prefix)) {
+ if (name.startsWith(prefix)) {
FSDataInputStream input = fs.open(new Path(outputFile));
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> avroStream =
@@ -840,7 +841,7 @@ public class TestHDFSEventSink {
}
Assert.assertTrue("Found = " + found + ", Expected = " +
expected + ", Left = " + bodies.size() + " " + bodies,
- bodies.size() == 0);
+ bodies.size() == 0);
}
/**
@@ -849,9 +850,9 @@ public class TestHDFSEventSink {
* This relies on Transactional rollback semantics for durability and
* the behavior of the BucketWriter class of close()ing upon IOException.
*/
- @Test
- public void testCloseReopen() throws InterruptedException,
- LifecycleException, EventDeliveryException, IOException {
+ @Test
+ public void testCloseReopen()
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
final int numBatches = 4;
@@ -924,8 +925,8 @@ public class TestHDFSEventSink {
* a new one is used for the next set of events.
*/
@Test
- public void testCloseReopenOnRollTime() throws InterruptedException,
- LifecycleException, EventDeliveryException, IOException {
+ public void testCloseReopenOnRollTime()
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
final int numBatches = 4;
@@ -973,7 +974,7 @@ public class TestHDFSEventSink {
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
event.getHeaders().put("timestamp",
- String.valueOf(eventDate.getTimeInMillis()));
+ String.valueOf(eventDate.getTimeInMillis()));
event.getHeaders().put("hostname", "Host" + i);
String body = "Test." + i + "." + j;
event.setBody(body.getBytes());
@@ -997,9 +998,9 @@ public class TestHDFSEventSink {
Assert.assertTrue(badWriterFactory.openCount.get() >= 2);
LOG.info("Total number of bucket writers opened: {}",
- badWriterFactory.openCount.get());
+ badWriterFactory.openCount.get());
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
- bodies);
+ bodies);
}
/**
@@ -1007,8 +1008,8 @@ public class TestHDFSEventSink {
* sfWriters map.
*/
@Test
- public void testCloseRemovesFromSFWriters() throws InterruptedException,
- LifecycleException, EventDeliveryException, IOException {
+ public void testCloseRemovesFromSFWriters()
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
final String fileName = "FlumeData";
@@ -1055,7 +1056,7 @@ public class TestHDFSEventSink {
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
event.getHeaders().put("timestamp",
- String.valueOf(eventDate.getTimeInMillis()));
+ String.valueOf(eventDate.getTimeInMillis()));
event.getHeaders().put("hostname", "Host" + i);
String body = "Test." + i + "." + j;
event.setBody(body.getBytes());
@@ -1080,9 +1081,9 @@ public class TestHDFSEventSink {
sink.stop();
LOG.info("Total number of bucket writers opened: {}",
- badWriterFactory.openCount.get());
+ badWriterFactory.openCount.get());
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
- bodies);
+ bodies);
}
@@ -1163,8 +1164,9 @@ public class TestHDFSEventSink {
* append using slow sink writer with specified append timeout
* verify that the data is written correctly to files
*/
- private void slowAppendTestHelper (long appendTimeout) throws InterruptedException, IOException,
- LifecycleException, EventDeliveryException, IOException {
+ private void slowAppendTestHelper(long appendTimeout)
+ throws InterruptedException, IOException, LifecycleException, EventDeliveryException,
+ IOException {
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -1230,7 +1232,7 @@ public class TestHDFSEventSink {
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
- Path fList[] = FileUtil.stat2Paths(dirStat);
+ Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
// Note that we'll end up with two files with only a head
@@ -1292,7 +1294,7 @@ public class TestHDFSEventSink {
Transaction txn = channel.getTransaction();
txn.begin();
- for(int i=0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
Event event = new SimpleEvent();
event.setBody(("test event " + i).getBytes());
channel.put(event);
@@ -1317,9 +1319,9 @@ public class TestHDFSEventSink {
FileStatus[] dirStat = fs.listStatus(dirPath);
Path[] fList = FileUtil.stat2Paths(dirStat);
Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","),
- 2, fList.length);
+ 2, fList.length);
Assert.assertTrue(!fList[0].getName().endsWith(".tmp") &&
- !fList[1].getName().endsWith(".tmp"));
+ !fList[1].getName().endsWith(".tmp"));
fs.close();
}
@@ -1338,8 +1340,7 @@ public class TestHDFSEventSink {
}
@Test
- public void testBadConfigurationForRetryIntervalZero() throws
- Exception {
+ public void testBadConfigurationForRetryIntervalZero() throws Exception {
Context context = getContextForRetryTests();
context.put("hdfs.retryInterval", "0");
@@ -1348,43 +1349,41 @@ public class TestHDFSEventSink {
}
@Test
- public void testBadConfigurationForRetryIntervalNegative() throws
- Exception {
+ public void testBadConfigurationForRetryIntervalNegative() throws Exception {
Context context = getContextForRetryTests();
context.put("hdfs.retryInterval", "-1");
Configurables.configure(sink, context);
Assert.assertEquals(1, sink.getTryCount());
}
+
@Test
- public void testBadConfigurationForRetryCountZero() throws
- Exception {
+ public void testBadConfigurationForRetryCountZero() throws Exception {
Context context = getContextForRetryTests();
context.put("hdfs.closeTries" ,"0");
Configurables.configure(sink, context);
Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
}
+
@Test
- public void testBadConfigurationForRetryCountNegative() throws
- Exception {
+ public void testBadConfigurationForRetryCountNegative() throws Exception {
Context context = getContextForRetryTests();
context.put("hdfs.closeTries" ,"-4");
Configurables.configure(sink, context);
Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
}
+
@Test
- public void testRetryRename() throws InterruptedException,
- LifecycleException,
- EventDeliveryException, IOException {
+ public void testRetryRename()
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
testRetryRename(true);
testRetryRename(false);
}
- private void testRetryRename(boolean closeSucceed) throws InterruptedException,
- LifecycleException,
- EventDeliveryException, IOException {
+ private void testRetryRename(boolean closeSucceed)
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
String newPath = testPath + "/retryBucket";
@@ -1441,8 +1440,8 @@ public class TestHDFSEventSink {
Collection<BucketWriter> writers = sink.getSfWriters().values();
int totalRenameAttempts = 0;
- for(BucketWriter writer: writers) {
- LOG.info("Rename tries = "+ writer.renameTries.get());
+ for (BucketWriter writer : writers) {
+ LOG.info("Rename tries = " + writer.renameTries.get());
totalRenameAttempts += writer.renameTries.get();
}
// stop clears the sfWriters map, so we need to compute the
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
index 6381edc..974e857 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
@@ -47,8 +47,7 @@ public class TestSequenceFileSerializerFactory {
@Test
public void getCustomFormatter() {
- SequenceFileSerializer formatter = SequenceFileSerializerFactory
- .getSerializer(
+ SequenceFileSerializer formatter = SequenceFileSerializerFactory.getSerializer(
"org.apache.flume.sink.hdfs.MyCustomSerializer$Builder", new Context());
assertTrue(formatter != null);
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
index 46724f2..c417404 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -55,8 +55,8 @@ import java.util.UUID;
public class TestHiveSink {
// 1) partitioned table
- final static String dbName = "testing";
- final static String tblName = "alerts";
+ static final String dbName = "testing";
+ static final String tblName = "alerts";
public static final String PART1_NAME = "continent";
public static final String PART2_NAME = "country";
@@ -72,8 +72,8 @@ public class TestHiveSink {
private final ArrayList<String> partitionVals;
// 2) un-partitioned table
- final static String dbName2 = "testing2";
- final static String tblName2 = "alerts2";
+ static final String dbName2 = "testing2";
+ static final String tblName2 = "alerts2";
final String[] colNames2 = {COL1,COL2};
private String[] colTypes2 = { "int", "string" };
@@ -88,7 +88,6 @@ public class TestHiveSink {
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
-
private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
public TestHiveSink() throws Exception {
@@ -182,8 +181,8 @@ public class TestHiveSink {
TestUtil.dropDB(conf, dbName2);
String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
- TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2
- , null, dbLocation);
+ TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2,
+ null, dbLocation);
try {
int totalRecords = 4;
@@ -282,7 +281,7 @@ public class TestHiveSink {
String body = j + ",blah,This is a log message,other stuff";
event.setBody(body.getBytes());
eventDate.clear();
- eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm
+ eventDate.set(2014, 03, 03, j % batchCount, 1); // yy mm dd hh mm
event.getHeaders().put( "timestamp",
String.valueOf(eventDate.getTimeInMillis()) );
event.getHeaders().put( PART1_NAME, "Asia" );
@@ -317,7 +316,7 @@ public class TestHiveSink {
throws EventDeliveryException, IOException, CommandNeedRetryException {
int batchSize = 2;
int batchCount = 3;
- int totalRecords = batchCount*batchSize;
+ int totalRecords = batchCount * batchSize;
Context context = new Context();
context.put("hive.metastore", metaStoreURI);
context.put("hive.database", dbName);
@@ -340,7 +339,7 @@ public class TestHiveSink {
txn.begin();
for (int j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
- String body = i*j + ",blah,This is a log message,other stuff";
+ String body = i * j + ",blah,This is a log message,other stuff";
event.setBody(body.getBytes());
bodies.add(body);
channel.put(event);
@@ -361,7 +360,7 @@ public class TestHiveSink {
public void testJsonSerializer() throws Exception {
int batchSize = 2;
int batchCount = 2;
- int totalRecords = batchCount*batchSize;
+ int totalRecords = batchCount * batchSize;
Context context = new Context();
context.put("hive.metastore",metaStoreURI);
context.put("hive.database",dbName);
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
index 41bf0f6..4d7c9bb 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -42,8 +42,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestHiveWriter {
- final static String dbName = "testing";
- final static String tblName = "alerts";
+ static final String dbName = "testing";
+ static final String tblName = "alerts";
public static final String PART1_NAME = "continent";
public static final String PART2_NAME = "country";
@@ -106,8 +106,8 @@ public class TestHiveWriter {
TestUtil.dropDB(conf, dbName);
String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
- TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes
- , partNames, dbLocation);
+ TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes, partNames,
+ dbLocation);
// 2) Setup serializer
Context ctx = new Context();
@@ -120,8 +120,8 @@ public class TestHiveWriter {
public void testInstantiate() throws Exception {
HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
- HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter);
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter);
writer.close();
}
@@ -130,8 +130,8 @@ public class TestHiveWriter {
public void testWriteBasic() throws Exception {
HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
- HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter);
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter);
writeEvents(writer,3);
writer.flush(false);
@@ -144,8 +144,8 @@ public class TestHiveWriter {
HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
- HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter);
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter);
checkRecordCountInTable(0);
SimpleEvent event = new SimpleEvent();
@@ -184,8 +184,8 @@ public class TestHiveWriter {
int txnPerBatch = 3;
- HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter);
+ HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout, callTimeoutPool,
+ "flumetest", serializer, sinkCounter);
Assert.assertEquals(writer.getRemainingTxns(),2);
writer.flush(true);
@@ -275,14 +275,13 @@ public class TestHiveWriter {
ctx.put("serializer.serdeSeparator", "ab");
try {
serializer3.configure(ctx);
- Assert.assertTrue("Bad serdeSeparator character was accepted" ,false);
- } catch (Exception e){
+ Assert.assertTrue("Bad serdeSeparator character was accepted", false);
+ } catch (Exception e) {
// expect an exception
}
}
-
@Test
public void testSecondWriterBeforeFirstCommits() throws Exception {
// here we open a new writer while the first is still writing (not committed)
@@ -295,13 +294,13 @@ public class TestHiveWriter {
SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
- HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+ HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter1);
writeEvents(writer1, 3);
- HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+ HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter2);
writeEvents(writer2, 3);
writer2.flush(false); // commit
@@ -311,7 +310,6 @@ public class TestHiveWriter {
writer2.close();
}
-
@Test
public void testSecondWriterAfterFirstCommits() throws Exception {
// here we open a new writer after the first writer has committed one txn
@@ -324,16 +322,16 @@ public class TestHiveWriter {
SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
- HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+ HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter1);
writeEvents(writer1, 3);
writer1.flush(false); // commit
- HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
- , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+ HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest",
+ serializer, sinkCounter2);
writeEvents(writer2, 3);
writer2.flush(false); // commit
@@ -342,8 +340,8 @@ public class TestHiveWriter {
writer2.close();
}
-
- private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException {
+ private void writeEvents(HiveWriter writer, int count)
+ throws InterruptedException, HiveWriter.WriteException {
SimpleEvent event = new SimpleEvent();
for (int i = 1; i <= count; i++) {
event.setBody((i + ",xyz,Hello world,abc").getBytes());
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
index 107789f..1fcb4eb 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
@@ -46,7 +46,7 @@ import java.util.List;
public class TestUtil {
- private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+ private static final String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
/**
* Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
@@ -81,7 +81,7 @@ public class TestUtil {
runDDL(driver, crtTbl);
System.out.println("crtTbl = " + crtTbl);
- if (partNames!=null && partNames.length!=0) {
+ if (partNames != null && partNames.length != 0) {
String addPart = "alter table " + tableName + " add partition ( " +
getTablePartsStr2(partNames, partVals) + " )";
runDDL(driver, addPart);
@@ -96,7 +96,8 @@ public class TestUtil {
}
// delete db and all tables in it
- public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
+ public static void dropDB(HiveConf conf, String databaseName)
+ throws HiveException, MetaException {
IMetaStoreClient client = new HiveMetaStoreClient(conf);
try {
for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
@@ -110,9 +111,9 @@ public class TestUtil {
private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
StringBuffer sb = new StringBuffer();
- for (int i=0; i < colNames.length; ++i) {
+ for (int i = 0; i < colNames.length; ++i) {
sb.append(colNames[i] + " " + colTypes[i]);
- if (i<colNames.length-1) {
+ if (i < colNames.length - 1) {
sb.append(",");
}
}
@@ -121,13 +122,13 @@ public class TestUtil {
// converts partNames into "partName1 string, partName2 string"
private static String getTablePartsStr(String[] partNames) {
- if (partNames==null || partNames.length==0) {
+ if (partNames == null || partNames.length == 0) {
return "";
}
StringBuffer sb = new StringBuffer();
- for (int i=0; i < partNames.length; ++i) {
+ for (int i = 0; i < partNames.length; ++i) {
sb.append(partNames[i] + " string");
- if (i < partNames.length-1) {
+ if (i < partNames.length - 1) {
sb.append(",");
}
}
@@ -137,9 +138,9 @@ public class TestUtil {
// converts partNames,partVals into "partName1=val1, partName2=val2"
private static String getTablePartsStr2(String[] partNames, List<String> partVals) {
StringBuffer sb = new StringBuffer();
- for (int i=0; i < partVals.size(); ++i) {
+ for (int i = 0; i < partVals.size(); ++i) {
sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
- if (i < partVals.size()-1) {
+ if (i < partVals.size() - 1) {
sb.append(",");
}
}
@@ -147,7 +148,7 @@ public class TestUtil {
}
public static ArrayList<String> listRecordsInTable(Driver driver, String dbName, String tblName)
- throws CommandNeedRetryException, IOException {
+ throws CommandNeedRetryException, IOException {
driver.run("select * from " + dbName + "." + tblName);
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
@@ -155,8 +156,9 @@ public class TestUtil {
}
public static ArrayList<String> listRecordsInPartition(Driver driver, String dbName,
- String tblName, String continent, String country)
- throws CommandNeedRetryException, IOException {
+ String tblName, String continent,
+ String country)
+ throws CommandNeedRetryException, IOException {
driver.run("select * from " + dbName + "." + tblName + " where continent='"
+ continent + "' and country='" + country + "'");
ArrayList<String> res = new ArrayList<String>();
@@ -164,9 +166,9 @@ public class TestUtil {
return res;
}
-
public static class RawFileSystem extends RawLocalFileSystem {
private static final URI NAME;
+
static {
try {
NAME = new URI("raw:///");
@@ -211,9 +213,10 @@ public class TestUtil {
FsPermission.createImmutable(mod), "owen", "users", path);
}
}
+
private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
int retryCount = 1; // # of times to retry if first attempt fails
- for (int attempt=0; attempt <= retryCount; ++attempt) {
+ for (int attempt = 0; attempt <= retryCount; ++attempt) {
try {
driver.run(sql);
return true;
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
index e6c065e..32517d1 100644
--- a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
+++ b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java
@@ -19,7 +19,12 @@ package org.apache.flume.sink.irc;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
@@ -116,7 +121,9 @@ public class TestIRCSink {
try {
Socket socket = ss.accept();
process(socket);
- } catch (Exception ex) {/* noop */ }
+ } catch (Exception ex) {
+ /* noop */
+ }
}
} catch (IOException e) {
// noop
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index f9272fa..9fbd747 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -18,17 +18,6 @@
*/
package org.apache.flume.sink.elasticsearch;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Map;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -51,6 +40,17 @@ import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Before;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertEquals;
+
public abstract class AbstractElasticSearchSinkTest {
static final String DEFAULT_INDEX_NAME = "flume";
@@ -136,7 +136,8 @@ public abstract class AbstractElasticSearchSinkTest {
.setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
}
- void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, Event... events) {
+ void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody,
+ Event... events) {
SearchHits hitResponse = response.getHits();
assertEquals(expectedHits, hitResponse.getTotalHits());
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
index 8022111..b62254e 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -18,15 +18,7 @@
*/
package org.apache.flume.sink.elasticsearch;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
@@ -39,7 +31,14 @@ import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestElasticSearchIndexRequestBuilderFactory
extends AbstractElasticSearchSinkTest {
@@ -70,7 +69,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
@Test
public void indexNameShouldBePrefixDashFormattedTimestamp() {
long millis = 987654321L;
- assertEquals("prefix-"+factory.fastDateFormat.format(millis),
+ assertEquals("prefix-" + factory.fastDateFormat.format(millis),
factory.getIndexName("prefix", millis));
}
@@ -135,7 +134,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
assertEquals(indexPrefix + '-'
+ ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
- indexRequestBuilder.request().index());
+ indexRequestBuilder.request().index());
assertEquals(indexType, indexRequestBuilder.request().type());
assertArrayEquals(FakeEventSerializer.FAKE_BYTES,
indexRequestBuilder.request().source().array());
@@ -154,7 +153,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
assertEquals(indexPrefix + '-'
+ ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L),
- indexRequestBuilder.request().index());
+ indexRequestBuilder.request().index());
}
@Test
@@ -174,7 +173,7 @@ public class TestElasticSearchIndexRequestBuilderFactory
assertEquals(indexValue + '-'
+ ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
- indexRequestBuilder.request().index());
+ indexRequestBuilder.request().index());
assertEquals(typeValue, indexRequestBuilder.request().type());
}
@@ -192,7 +191,8 @@ public class TestElasticSearchIndexRequestBuilderFactory
static class FakeEventSerializer implements ElasticSearchEventSerializer {
static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6};
- boolean configuredWithContext, configuredWithComponentConfiguration;
+ boolean configuredWithContext;
+ boolean configuredWithComponentConfiguration;
@Override
public BytesStream getContentBuilder(Event event) throws IOException {
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
index ab9587d..65b4dab 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
@@ -18,6 +18,7 @@
*/
package org.apache.flume.sink.elasticsearch;
+import com.google.gson.JsonParser;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -28,9 +29,6 @@ import org.junit.Test;
import java.util.Date;
import java.util.Map;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonElement;
-
import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
@@ -56,26 +54,25 @@ public class TestElasticSearchLogStashEventSerializer {
Event event = EventBuilder.withBody(message.getBytes(charset));
event.setHeaders(headers);
- XContentBuilder expected = jsonBuilder()
- .startObject();
- expected.field("@message", new String(message.getBytes(), charset));
- expected.field("@timestamp", new Date(timestamp));
- expected.field("@source", "flume_tail_src");
- expected.field("@type", "sometype");
- expected.field("@source_host", "test@localhost");
- expected.field("@source_path", "/tmp/test");
-
- expected.startObject("@fields");
- expected.field("timestamp", String.valueOf(timestamp));
- expected.field("src_path", "/tmp/test");
- expected.field("host", "test@localhost");
- expected.field("headerNameTwo", "headerValueTwo");
- expected.field("source", "flume_tail_src");
- expected.field("headerNameOne", "headerValueOne");
- expected.field("type", "sometype");
- expected.endObject();
-
- expected.endObject();
+ XContentBuilder expected = jsonBuilder().startObject();
+ expected.field("@message", new String(message.getBytes(), charset));
+ expected.field("@timestamp", new Date(timestamp));
+ expected.field("@source", "flume_tail_src");
+ expected.field("@type", "sometype");
+ expected.field("@source_host", "test@localhost");
+ expected.field("@source_path", "/tmp/test");
+
+ expected.startObject("@fields");
+ expected.field("timestamp", String.valueOf(timestamp));
+ expected.field("src_path", "/tmp/test");
+ expected.field("host", "test@localhost");
+ expected.field("headerNameTwo", "headerValueTwo");
+ expected.field("source", "flume_tail_src");
+ expected.field("headerNameOne", "headerValueOne");
+ expected.field("type", "sometype");
+ expected.endObject();
+
+ expected.endObject();
XContentBuilder actual = fixture.getContentBuilder(event);
@@ -102,26 +99,25 @@ public class TestElasticSearchLogStashEventSerializer {
Event event = EventBuilder.withBody(message.getBytes(charset));
event.setHeaders(headers);
- XContentBuilder expected = jsonBuilder().
- startObject();
- expected.field("@message", new String(message.getBytes(), charset));
- expected.field("@timestamp", new Date(timestamp));
- expected.field("@source", "flume_tail_src");
- expected.field("@type", "sometype");
- expected.field("@source_host", "test@localhost");
- expected.field("@source_path", "/tmp/test");
-
- expected.startObject("@fields");
- expected.field("timestamp", String.valueOf(timestamp));
- expected.field("src_path", "/tmp/test");
- expected.field("host", "test@localhost");
- expected.field("headerNameTwo", "headerValueTwo");
- expected.field("source", "flume_tail_src");
- expected.field("headerNameOne", "headerValueOne");
- expected.field("type", "sometype");
- expected.endObject();
-
- expected.endObject();
+ XContentBuilder expected = jsonBuilder().startObject();
+ expected.field("@message", new String(message.getBytes(), charset));
+ expected.field("@timestamp", new Date(timestamp));
+ expected.field("@source", "flume_tail_src");
+ expected.field("@type", "sometype");
+ expected.field("@source_host", "test@localhost");
+ expected.field("@source_path", "/tmp/test");
+
+ expected.startObject("@fields");
+ expected.field("timestamp", String.valueOf(timestamp));
+ expected.field("src_path", "/tmp/test");
+ expected.field("host", "test@localhost");
+ expected.field("headerNameTwo", "headerValueTwo");
+ expected.field("source", "flume_tail_src");
+ expected.field("headerNameOne", "headerValueOne");
+ expected.field("type", "sometype");
+ expected.endObject();
+
+ expected.endObject();
XContentBuilder actual = fixture.getContentBuilder(event);