You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/17 21:06:05 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1910 #resolve #comment
fixed style violations and warnings in Block Reader and Splitter classes
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 78033594b -> 1bfd2d15a
MLHR-1910 #resolve #comment fixed style violations and warnings in Block Reader and Splitter classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/17d05763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/17d05763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/17d05763
Branch: refs/heads/devel-3
Commit: 17d05763ddd48ccd9c3b58d44fff153952fdb537
Parents: 7803359
Author: Chandni Singh <cs...@apache.org>
Authored: Mon Nov 16 23:15:27 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 16 23:45:01 2015 -0800
----------------------------------------------------------------------
.../lib/io/block/AbstractBlockReader.java | 90 +++++++++++---------
.../lib/io/block/AbstractFSBlockReader.java | 22 ++---
.../datatorrent/lib/io/block/BlockMetadata.java | 7 +-
.../datatorrent/lib/io/block/FSSliceReader.java | 5 +-
.../datatorrent/lib/io/block/ReaderContext.java | 40 ++++-----
.../datatorrent/lib/io/block/package-info.java | 2 +-
.../lib/io/block/AbstractBlockReaderTest.java | 76 +++++++++--------
.../lib/io/block/FSLineReaderTest.java | 49 ++++++-----
.../lib/io/block/FSSliceReaderTest.java | 30 +++----
.../lib/io/block/ReadAheadLineReaderTest.java | 2 +-
.../lib/io/fs/FileSplitterBaseTest.java | 26 ++++--
.../datatorrent/lib/io/fs/FileSplitterTest.java | 54 ++++++------
12 files changed, 215 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index 1839e68..aada298 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -21,27 +21,37 @@ package com.datatorrent.lib.io.block;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.validation.constraints.NotNull;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.PositionedReadable;
-import com.datatorrent.lib.counters.BasicCounters;
-
-import com.datatorrent.api.*;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
/**
* AbstractBlockReader processes a block of data from a stream.<br/>
@@ -54,7 +64,8 @@ import com.datatorrent.common.util.BaseOperator;
*
* <p/>
* Properties that can be set on AbstractBlockReader:<br/>
- * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port queue size. This property disables
+ * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port
+ * queue size. This property disables
* collecting stats and thus partitioning.<br/>
* {@link #maxReaders}: Maximum number of readers when dynamic partitioning is on.<br/>
* {@link #minReaders}: Minimum number of readers when dynamic partitioning is on.<br/>
@@ -71,8 +82,9 @@ import com.datatorrent.common.util.BaseOperator;
*/
@StatsListener.DataQueueSize
-public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable> extends BaseOperator implements
- Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler
+public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable>
+ extends BaseOperator
+ implements Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler
{
protected int operatorId;
protected transient long windowId;
@@ -110,17 +122,17 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
*/
protected long intervalMillis;
- protected transient final StatsListener.Response response;
+ protected final transient StatsListener.Response response;
protected transient int partitionCount;
- protected transient final Map<Integer, Integer> backlogPerOperator;
+ protected final transient Map<Integer, Integer> backlogPerOperator;
private transient long nextMillis;
protected transient B lastProcessedBlock;
protected transient long lastBlockOpenTime;
protected transient boolean consecutiveBlock;
- public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<B>();
- public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<ReaderRecord<R>>();
+ public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>();
public final transient DefaultInputPort<B> blocksMetadataInput = new DefaultInputPort<B>()
{
@@ -139,7 +151,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
response = new StatsListener.Response();
backlogPerOperator = Maps.newHashMap();
partitionCount = 1;
- counters = new BasicCounters<MutableLong>(MutableLong.class);
+ counters = new BasicCounters<>(MutableLong.class);
collectStats = true;
lastBlockOpenTime = -1;
}
@@ -172,17 +184,14 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
try {
teardownStream(lastProcessedBlock);
lastProcessedBlock = null;
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- }
- else {
+ } else {
/* nothing to do here, so sleep for a while to avoid busy loop */
try {
Thread.sleep(sleepTimeMillis);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
@@ -199,13 +208,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
{
try {
long blockStartTime = System.currentTimeMillis();
- if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) {
+ if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null
+ || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) {
teardownStream(lastProcessedBlock);
consecutiveBlock = false;
lastBlockOpenTime = System.currentTimeMillis();
stream = setupStream(block);
- }
- else {
+ } else {
consecutiveBlock = true;
}
readBlock(block);
@@ -216,15 +225,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
blocksMetadataOutput.emit(block);
}
blocksPerWindow++;
- }
- catch (IOException ie) {
+ } catch (IOException ie) {
try {
if (lastProcessedBlock != null) {
teardownStream(lastProcessedBlock);
lastProcessedBlock = null;
}
- }
- catch (IOException ioe) {
+ } catch (IOException ioe) {
throw new RuntimeException("closing last", ie);
}
throw new RuntimeException(ie);
@@ -250,7 +257,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
//If the record is partial then ignore the record.
if (record != null) {
counters.getCounter(ReaderCounterKeys.RECORDS).increment();
- messages.emit(new ReaderRecord<R>(blockMetadata.getBlockId(), record));
+ messages.emit(new ReaderRecord<>(blockMetadata.getBlockId(), record));
}
}
}
@@ -261,7 +268,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
*/
@SuppressWarnings("unchecked")
@Override
- public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context)
+ public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(
+ Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context)
{
if (partitions.iterator().next().getStats() == null) {
//First time when define partitions is called
@@ -271,7 +279,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
//Create new partitions
for (Partition<AbstractBlockReader<R, B, STREAM>> partition : partitions) {
- newPartitions.add(new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(partition.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
}
partitions.clear();
int morePartitionsToCreate = partitionCount - newPartitions.size();
@@ -287,8 +295,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
LOG.debug("partition removed {}", toRemove.getPartitionedInstance().operatorId);
partitionIterator.remove();
}
- }
- else {
+ } else {
//Add more partitions
Kryo kryo = new Kryo();
while (morePartitionsToCreate-- > 0) {
@@ -301,7 +308,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
@SuppressWarnings("unchecked")
AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass());
- DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(blockReader);
+ DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(
+ blockReader);
newPartitions.add(partition);
}
}
@@ -389,12 +397,10 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
if (totalBacklog > maxReaders) {
LOG.debug("large backlog {}", totalBacklog);
newPartitionCount = maxReaders;
- }
- else if (totalBacklog < minReaders) {
+ } else if (totalBacklog < minReaders) {
LOG.debug("small backlog {}", totalBacklog);
newPartitionCount = minReaders;
- }
- else {
+ } else {
newPartitionCount = getAdjustedCount(totalBacklog);
LOG.debug("moderate backlog {}", totalBacklog);
}
@@ -574,7 +580,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
}
- public static enum ReaderCounterKeys
+ public enum ReaderCounterKeys
{
RECORDS, BLOCKS, BYTES, TIME
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
index 006df45..d74c9c9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
@@ -36,7 +36,8 @@ import com.datatorrent.api.StatsListener;
* @since 2.1.0
*/
@StatsListener.DataQueueSize
-public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream>
+public abstract class AbstractFSBlockReader<R>
+ extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream>
{
protected transient FileSystem fs;
protected transient Configuration configuration;
@@ -48,8 +49,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
configuration = new Configuration();
try {
fs = getFSInstance();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("creating fs", e);
}
}
@@ -60,8 +60,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
super.teardown();
try {
fs.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -88,27 +87,28 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
*
* @param <R> type of records
*/
- public static abstract class AbstractFSLineReader<R> extends AbstractFSBlockReader<R>
+ public abstract static class AbstractFSLineReader<R> extends AbstractFSBlockReader<R>
{
public AbstractFSLineReader()
{
super();
- this.readerContext = new ReaderContext.LineReaderContext<FSDataInputStream>();
+ this.readerContext = new ReaderContext.LineReaderContext<>();
}
}
/**
- * An {@link AbstractFSBlockReader} which reads lines from the block using {@link ReaderContext.ReadAheadLineReaderContext}
+ * An {@link AbstractFSBlockReader} which reads lines from the block using
+ * {@link ReaderContext.ReadAheadLineReaderContext}
*
- * @param <R>
+ * @param <R> type of record.
*/
- public static abstract class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R>
+ public abstract static class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R>
{
public AbstractFSReadAheadLineReader()
{
super();
- this.readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+ this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 24e9623..534024d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -54,7 +54,7 @@ public interface BlockMetadata
*/
long getPreviousBlockId();
- public abstract class AbstractBlockMetadata implements BlockMetadata
+ abstract class AbstractBlockMetadata implements BlockMetadata
{
private long offset;
private long length;
@@ -198,7 +198,7 @@ public interface BlockMetadata
/**
* A block of file which contains file path adn other block properties.
*/
- public static class FileBlockMetadata extends AbstractBlockMetadata
+ class FileBlockMetadata extends AbstractBlockMetadata
{
private final String filePath;
@@ -208,7 +208,8 @@ public interface BlockMetadata
filePath = null;
}
- public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
+ public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock,
+ long previousBlockId)
{
super(blockId, offset, length, isLastBlock, previousBlockId);
this.filePath = filePath;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
index 948fc3e..ad55358 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
@@ -18,10 +18,7 @@
*/
package com.datatorrent.lib.io.block;
-import org.apache.hadoop.fs.FSDataInputStream;
-
import com.datatorrent.api.StatsListener;
-
import com.datatorrent.netlet.util.Slice;
/**
@@ -38,7 +35,7 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice>
public FSSliceReader()
{
super();
- this.readerContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+ this.readerContext = new ReaderContext.FixedBytesReaderContext<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index b69b1ee..1ee6c53 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -22,16 +22,16 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.PositionedReadable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PositionedReadable;
+
/**
* This controls how an {@link AbstractBlockReader} reads a {@link BlockMetadata}.
*
* @param <STREAM> type of stream
- *
* @since 2.1.0
*/
public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
@@ -59,7 +59,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
* Represents the total bytes used to construct the record.<br/>
* Used bytes can be different from the bytes in the record.
*/
- public static class Entity
+ class Entity
{
private byte[] record;
private long usedBytes;
@@ -97,9 +97,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
*
* @param <STREAM> type of stream.
*/
- public static abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM>
+ abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM>
{
-
protected transient long offset;
protected transient STREAM stream;
protected transient BlockMetadata blockMetadata;
@@ -143,8 +142,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
*
* @param <STREAM> type of stream.
*/
-
- public static class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
+ class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
{
protected int bufferSize;
@@ -178,8 +176,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
@Override
protected Entity readEntity() throws IOException
{
- //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block boundary
- //and faced issues with duplicate records. Controlling the buffer size didn't help either.
+ //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block
+ // boundary and faced issues with duplicate records. Controlling the buffer size didn't help either.
boolean foundEOL = false;
int bytesRead = 0;
@@ -200,8 +198,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
if (c != '\r' && c != '\n') {
tmpBuilder.write(c);
posInStr++;
- }
- else {
+ } else {
foundEOL = true;
break;
}
@@ -216,14 +213,12 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
if (c == '\r' || c == '\n') {
emptyBuilder.write(c);
posInStr++;
- }
- else {
+ } else {
break;
}
}
usedBytes += emptyBuilder.toByteArray().length;
- }
- else {
+ } else {
//read more bytes from the input stream
posInStr = 0;
}
@@ -266,14 +261,14 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
/**
* Another reader context that splits the block into records on '\n' or '\r'.<br/>
* This implementation doesn't need a way to validate the start of a record.<br/>
- *
+ * <p/>
* This starts parsing the block (except the first block of the file) from the first eol character.
* It is a less optimized version of an {@link LineReaderContext} which always reads beyond the block
* boundary.
*
* @param <STREAM>
*/
- public static class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM>
+ class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM>
{
@Override
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
@@ -285,8 +280,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
try {
Entity entity = readEntity();
offset += entity.usedBytes;
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException("when reading first entity", e);
}
}
@@ -310,7 +304,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
*
* @param <STREAM> type of stream.
*/
- public static class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
+ class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
{
//When this field is null, it is initialized to default fs block size in setup.
protected Integer length;
@@ -319,7 +313,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
{
if (length == null) {
- length = (int) new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024);
+ length = (int)new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024);
LOG.debug("length init {}", length);
}
super.initialize(stream, blockMetadata, consecutiveBlock);
@@ -331,7 +325,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
entity.clear();
int bytesToRead = length;
if (offset + length >= blockMetadata.getLength()) {
- bytesToRead = (int) (blockMetadata.getLength() - offset);
+ bytesToRead = (int)(blockMetadata.getLength() - offset);
}
byte[] record = new byte[bytesToRead];
stream.readFully(offset, record, 0, bytesToRead);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
index 14cfd47..bc89b6e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package com.datatorrent.lib.io.block;
\ No newline at end of file
+package com.datatorrent.lib.io.block;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
index 3610191..b2b99e6 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
@@ -21,21 +21,21 @@ package com.datatorrent.lib.io.block;
import java.util.Collection;
import java.util.List;
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.fs.FSDataInputStream;
+
import com.google.common.collect.Lists;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
-
-import com.datatorrent.netlet.util.Slice;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.netlet.util.Slice;
/**
* Stats and partitioning tests for {@link AbstractBlockReader}
@@ -105,19 +105,23 @@ public class AbstractBlockReaderTest
Assert.assertTrue("partition needed", response.repartitionRequired);
Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount());
- List<Partitioner.Partition<AbstractBlockReader<Slice,
- BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+ List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+ partitions = Lists
+ .newArrayList();
- DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
- new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+ DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new
+ DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
+ sliceReader);
- TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion =
- new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats);
+ TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+ pseudoParttion = new TestUtils.MockPartition<>(
+ apartition, readerStats);
partitions.add(pseudoParttion);
- Collection<Partitioner.Partition<AbstractBlockReader<Slice,
- BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+ Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+ newPartitions = sliceReader
+ .definePartitions(partitions, null);
Assert.assertEquals(8, newPartitions.size());
}
@@ -131,39 +135,43 @@ public class AbstractBlockReaderTest
TestReader sliceReader = new TestReader();
sliceReader.processStats(readerStats);
- List<Partitioner.Partition<AbstractBlockReader<Slice,
- BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+ List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+ partitions = Lists
+ .newArrayList();
- DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
- new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+ DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new
+ DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
+ sliceReader);
- TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion =
- new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats);
+ TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+ pseudoParttion = new TestUtils.MockPartition<>(apartition, readerStats);
partitions.add(pseudoParttion);
- Collection<Partitioner.Partition<AbstractBlockReader<Slice,
- BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+ Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+ newPartitions = sliceReader.definePartitions(partitions, null);
+
+ List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+ newMocks = Lists.newArrayList();
- List<Partitioner.Partition<AbstractBlockReader<Slice,
- BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newMocks = Lists.newArrayList();
+ for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition
+ : newPartitions) {
- for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition :
- newPartitions) {
- partition.getPartitionedInstance().counters.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1));
+ partition.getPartitionedInstance().counters
+ .setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1));
- newMocks.add(
- new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
- (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>) partition,
- readerStats)
- );
+ newMocks.add(new TestUtils.MockPartition<>(
+ (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>)partition,
+ readerStats));
}
sliceReader.partitionCount = 1;
newPartitions = sliceReader.definePartitions(newMocks, null);
Assert.assertEquals(1, newPartitions.size());
- AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator().next().getPartitionedInstance();
- Assert.assertEquals("num blocks", 8, last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue());
+ AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator()
+ .next().getPartitionedInstance();
+ Assert.assertEquals("num blocks", 8,
+ last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue());
}
static class ReaderStats extends Stats.OperatorStats
@@ -171,7 +179,7 @@ public class AbstractBlockReaderTest
ReaderStats(int backlog, long readBlocks, long bytes, long time)
{
- BasicCounters<MutableLong> bc = new BasicCounters<MutableLong>(MutableLong.class);
+ BasicCounters<MutableLong> bc = new BasicCounters<>(MutableLong.class);
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(readBlocks));
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BYTES, new MutableLong(bytes));
bc.setCounter(AbstractBlockReader.ReaderCounterKeys.TIME, new MutableLong(time));
@@ -191,4 +199,4 @@ public class AbstractBlockReaderTest
return partitionCount;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
index 32ac536..5ddc8a9 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
@@ -18,7 +18,11 @@
*/
package com.datatorrent.lib.io.block;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
@@ -36,7 +40,6 @@ import com.google.common.collect.Lists;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -74,10 +77,10 @@ public class FSLineReaderTest
blockReader.setup(readerContext);
- messageSink = new CollectorTestSink<Object>();
+ messageSink = new CollectorTestSink<>();
blockReader.messages.setSink(messageSink);
- blockMetadataSink = new CollectorTestSink<Object>();
+ blockMetadataSink = new CollectorTestSink<>();
blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
BufferedReader reader;
@@ -88,11 +91,7 @@ public class FSLineReaderTest
messages.add(line.split(","));
}
reader.close();
- }
- catch (FileNotFoundException e) {
- throw new RuntimeException(e);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -111,8 +110,9 @@ public class FSLineReaderTest
public void test()
{
- BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L, 0L, testMeta.dataFile.length(),
- true, -1);
+ BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L,
+ 0L, testMeta.dataFile.length(),
+ true, -1);
testMeta.blockReader.beginWindow(1);
testMeta.blockReader.blocksMetadataInput.process(block);
@@ -123,7 +123,7 @@ public class FSLineReaderTest
for (int i = 0; i < messages.size(); i++) {
@SuppressWarnings("unchecked")
- AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+ AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
}
}
@@ -132,13 +132,16 @@ public class FSLineReaderTest
public void testMultipleBlocks()
{
long blockSize = 1000;
- int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+ 0 :
+ 1));
testMeta.blockReader.beginWindow(1);
for (int i = 0; i < noOfBlocks; i++) {
- BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
- i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1);
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+ testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
+ i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1);
testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
}
@@ -148,7 +151,7 @@ public class FSLineReaderTest
Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
for (int i = 0; i < messages.size(); i++) {
@SuppressWarnings("unchecked")
- AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+ AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
}
}
@@ -157,13 +160,17 @@ public class FSLineReaderTest
public void testNonConsecutiveBlocks()
{
long blockSize = 1000;
- int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+ 0 :
+ 1));
testMeta.blockReader.beginWindow(1);
for (int i = 0; i < noOfBlocks; i++) {
- BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i,
- i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, -1);
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+ testMeta.dataFile.getAbsolutePath(), i,
+ i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+ -1);
testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
}
@@ -173,7 +180,7 @@ public class FSLineReaderTest
Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
for (int i = 0; i < messages.size(); i++) {
@SuppressWarnings("unchecked")
- AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+ AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
}
}
@@ -193,4 +200,4 @@ public class FSLineReaderTest
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
index 23aa1af..37222a7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
@@ -23,20 +23,20 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
-
-import com.datatorrent.netlet.util.Slice;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
/**
* Tests for {@link FSSliceReader}.
@@ -63,8 +63,7 @@ public class FSSliceReaderTest
output = "target/" + description.getClassName() + "/" + description.getMethodName();
try {
FileUtils.forceMkdir(new File(output));
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
dataFile = new File("src/test/resources/reader_test_data.csv");
@@ -77,10 +76,10 @@ public class FSSliceReaderTest
blockReader.setup(readerContext);
- messageSink = new CollectorTestSink<Object>();
+ messageSink = new CollectorTestSink<>();
blockReader.messages.setSink(messageSink);
- blockMetadataSink = new CollectorTestSink<Object>();
+ blockMetadataSink = new CollectorTestSink<>();
blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
}
@@ -90,8 +89,7 @@ public class FSSliceReaderTest
blockReader.teardown();
try {
FileUtils.forceDelete(new File("target/" + description.getClassName()));
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -104,14 +102,16 @@ public class FSSliceReaderTest
public void testBytesReceived() throws IOException
{
long blockSize = 1500;
- int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+ 0 : 1));
testMeta.blockReader.beginWindow(1);
for (int i = 0; i < noOfBlocks; i++) {
- BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
- i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize,
- i == noOfBlocks - 1, i - 1);
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+ testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
+ i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize,
+ i == noOfBlocks - 1, i - 1);
testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
}
@@ -125,7 +125,7 @@ public class FSSliceReaderTest
for (Object message : messages) {
@SuppressWarnings("unchecked")
- AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>) message;
+ AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>)message;
totatBytesReceived += msg.getRecord().length;
outputStream.write(msg.getRecord().buffer);
}
@@ -135,4 +135,4 @@ public class FSSliceReaderTest
FileUtils.contentEquals(testMeta.dataFile, outputFile);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
index 056e7b2..5e0aa22 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
@@ -34,4 +34,4 @@ public class ReadAheadLineReaderTest extends FSLineReaderTest
return new String(bytes);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index b47315b..9e110be 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -23,18 +23,29 @@ import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.*;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.block.BlockMetadata;
@@ -218,7 +229,8 @@ public class FileSplitterBaseTest
int count;
transient CountDownLatch latch = new CountDownLatch(1);
- public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>()
+ public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new
+ DefaultInputPort<FileSplitterInput.FileMetadata>()
{
@Override
public void process(FileSplitterInput.FileMetadata fileMetadata)
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
index 09e3b9e..e62f643 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
@@ -25,10 +25,6 @@ import java.util.Set;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
@@ -39,11 +35,15 @@ import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.collect.Sets;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.io.block.BlockMetadata;
@@ -59,9 +59,9 @@ public class FileSplitterTest
protected void finished(Description description)
{
try {
- FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
- }
- catch (IOException e) {
+ FileContext.getLocalFSFileContext()
+ .delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -77,7 +77,7 @@ public class FileSplitterTest
Set<String> filePaths = Sets.newHashSet();
Context.OperatorContext context;
- Exchanger<Integer> exchanger = new Exchanger<Integer>();
+ Exchanger<Integer> exchanger = new Exchanger<>();
@Override
protected void starting(org.junit.runner.Description description)
@@ -100,8 +100,7 @@ public class FileSplitterTest
filePaths.add(new Path(this.dataDirectory, created.getName()).toUri().toString());
FileUtils.write(created, StringUtils.join(lines, '\n'));
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
@@ -118,10 +117,10 @@ public class FileSplitterTest
context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
fileSplitter.setup(context);
- fileMetadataSink = new CollectorTestSink<FileSplitter.FileMetadata>();
+ fileMetadataSink = new CollectorTestSink<>();
TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink);
- blockMetadataSink = new CollectorTestSink<BlockMetadata.FileBlockMetadata>();
+ blockMetadataSink = new CollectorTestSink<>();
TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink);
}
@@ -148,7 +147,7 @@ public class FileSplitterTest
testMeta.fileSplitter.endWindow();
Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size());
for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
- FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata) fileMetadata;
+ FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata)fileMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
Assert.assertNotNull("name: ", metadata.getFileName());
}
@@ -165,7 +164,7 @@ public class FileSplitterTest
testMeta.fileSplitter.emitTuples();
Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
- BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata;
+ BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
}
}
@@ -184,7 +183,7 @@ public class FileSplitterTest
for (int i = 0; i < 12; i++) {
FileSplitter.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i);
File testFile = new File(testMeta.dataDirectory, fm.getFileName());
- noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0));
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
}
Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
}
@@ -193,7 +192,7 @@ public class FileSplitterTest
public void testIdempotency() throws InterruptedException
{
IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager =
- new IdempotentStorageManager.FSIdempotentStorageManager();
+ new IdempotentStorageManager.FSIdempotentStorageManager();
testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
testMeta.fileSplitter.setup(testMeta.context);
@@ -206,7 +205,7 @@ public class FileSplitterTest
testMeta.fileSplitter.beginWindow(1);
Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
- BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata;
+ BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
}
}
@@ -271,7 +270,7 @@ public class FileSplitterTest
int noOfBlocks = 0;
for (int i = 0; i < 12; i++) {
File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
- noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0));
+ noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
}
testMeta.fileSplitter.setBlockSize(2L);
@@ -297,7 +296,8 @@ public class FileSplitterTest
@Test
public void testIdempotencyWithBlocksThreshold() throws InterruptedException
{
- IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager
+ .FSIdempotentStorageManager();
testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
testMeta.fileSplitter.setBlocksThreshold(10);
testMeta.fileSplitter.scanner.setScanIntervalMillis(500);
@@ -346,7 +346,8 @@ public class FileSplitterTest
@Ignore
public void testRecoveryOfPartialFile() throws InterruptedException
{
- IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager
+ .FSIdempotentStorageManager();
fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + "recovery");
testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
testMeta.fileSplitter.setBlockSize(2L);
@@ -395,8 +396,10 @@ public class FileSplitterTest
String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
- Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
- Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
+ Assert.assertTrue("Block file name 0",
+ testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
+ Assert.assertTrue("Block file name 1",
+ testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
}
@Test
@@ -442,7 +445,7 @@ public class FileSplitterTest
testMeta.fileSplitter.endWindow();
Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(),
- testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
+ testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
}
private static class MockScanner extends FileSplitter.TimeBasedDirectoryScanner
@@ -463,8 +466,7 @@ public class FileSplitterTest
LOG.debug("discovered {}", discoveredFiles.size());
testMeta.exchanger.exchange(discoveredFiles.size());
}
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1910' into
devel-3
Posted by ti...@apache.org.
Merge branch 'MLHR-1910' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1bfd2d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1bfd2d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1bfd2d15
Branch: refs/heads/devel-3
Commit: 1bfd2d15a1cde24ea5a5fc96034c45df258dbb1d
Parents: 7803359 17d0576
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 17 11:46:03 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 17 11:46:03 2015 -0800
----------------------------------------------------------------------
.../lib/io/block/AbstractBlockReader.java | 90 +++++++++++---------
.../lib/io/block/AbstractFSBlockReader.java | 22 ++---
.../datatorrent/lib/io/block/BlockMetadata.java | 7 +-
.../datatorrent/lib/io/block/FSSliceReader.java | 5 +-
.../datatorrent/lib/io/block/ReaderContext.java | 40 ++++-----
.../datatorrent/lib/io/block/package-info.java | 2 +-
.../lib/io/block/AbstractBlockReaderTest.java | 76 +++++++++--------
.../lib/io/block/FSLineReaderTest.java | 49 ++++++-----
.../lib/io/block/FSSliceReaderTest.java | 30 +++----
.../lib/io/block/ReadAheadLineReaderTest.java | 2 +-
.../lib/io/fs/FileSplitterBaseTest.java | 26 ++++--
.../datatorrent/lib/io/fs/FileSplitterTest.java | 54 ++++++------
12 files changed, 215 insertions(+), 188 deletions(-)
----------------------------------------------------------------------