You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/17 21:06:05 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1910 #resolve #comment fixed style violations and warnings in Block Reader and Splitter classes

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 78033594b -> 1bfd2d15a


MLHR-1910 #resolve #comment fixed style violations and warnings in Block Reader and Splitter classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/17d05763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/17d05763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/17d05763

Branch: refs/heads/devel-3
Commit: 17d05763ddd48ccd9c3b58d44fff153952fdb537
Parents: 7803359
Author: Chandni Singh <cs...@apache.org>
Authored: Mon Nov 16 23:15:27 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Nov 16 23:45:01 2015 -0800

----------------------------------------------------------------------
 .../lib/io/block/AbstractBlockReader.java       | 90 +++++++++++---------
 .../lib/io/block/AbstractFSBlockReader.java     | 22 ++---
 .../datatorrent/lib/io/block/BlockMetadata.java |  7 +-
 .../datatorrent/lib/io/block/FSSliceReader.java |  5 +-
 .../datatorrent/lib/io/block/ReaderContext.java | 40 ++++-----
 .../datatorrent/lib/io/block/package-info.java  |  2 +-
 .../lib/io/block/AbstractBlockReaderTest.java   | 76 +++++++++--------
 .../lib/io/block/FSLineReaderTest.java          | 49 ++++++-----
 .../lib/io/block/FSSliceReaderTest.java         | 30 +++----
 .../lib/io/block/ReadAheadLineReaderTest.java   |  2 +-
 .../lib/io/fs/FileSplitterBaseTest.java         | 26 ++++--
 .../datatorrent/lib/io/fs/FileSplitterTest.java | 54 ++++++------
 12 files changed, 215 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index 1839e68..aada298 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -21,27 +21,37 @@ package com.datatorrent.lib.io.block;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.fs.PositionedReadable;
 
-import com.datatorrent.lib.counters.BasicCounters;
-
-import com.datatorrent.api.*;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
 
 /**
  * AbstractBlockReader processes a block of data from a stream.<br/>
@@ -54,7 +64,8 @@ import com.datatorrent.common.util.BaseOperator;
  *
  * <p/>
  * Properties that can be set on AbstractBlockReader:<br/>
- * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port queue size. This property disables
+ * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port
+ * queue size. This property disables
  * collecting stats and thus partitioning.<br/>
  * {@link #maxReaders}: Maximum number of readers when dynamic partitioning is on.<br/>
  * {@link #minReaders}: Minimum number of readers when dynamic partitioning is on.<br/>
@@ -71,8 +82,9 @@ import com.datatorrent.common.util.BaseOperator;
  */
 
 @StatsListener.DataQueueSize
-public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable> extends BaseOperator implements
-  Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler
+public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable>
+    extends BaseOperator
+    implements Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler
 {
   protected int operatorId;
   protected transient long windowId;
@@ -110,17 +122,17 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
    */
   protected long intervalMillis;
 
-  protected transient final StatsListener.Response response;
+  protected final transient StatsListener.Response response;
   protected transient int partitionCount;
-  protected transient final Map<Integer, Integer> backlogPerOperator;
+  protected final transient Map<Integer, Integer> backlogPerOperator;
   private transient long nextMillis;
 
   protected transient B lastProcessedBlock;
   protected transient long lastBlockOpenTime;
   protected transient boolean consecutiveBlock;
 
-  public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<B>();
-  public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<ReaderRecord<R>>();
+  public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>();
+  public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>();
 
   public final transient DefaultInputPort<B> blocksMetadataInput = new DefaultInputPort<B>()
   {
@@ -139,7 +151,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
     response = new StatsListener.Response();
     backlogPerOperator = Maps.newHashMap();
     partitionCount = 1;
-    counters = new BasicCounters<MutableLong>(MutableLong.class);
+    counters = new BasicCounters<>(MutableLong.class);
     collectStats = true;
     lastBlockOpenTime = -1;
   }
@@ -172,17 +184,14 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
       try {
         teardownStream(lastProcessedBlock);
         lastProcessedBlock = null;
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
-    }
-    else {
+    } else {
       /* nothing to do here, so sleep for a while to avoid busy loop */
       try {
         Thread.sleep(sleepTimeMillis);
-      }
-      catch (InterruptedException ie) {
+      } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
       }
     }
@@ -199,13 +208,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
   {
     try {
       long blockStartTime = System.currentTimeMillis();
-      if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) {
+      if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null
+          || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) {
         teardownStream(lastProcessedBlock);
         consecutiveBlock = false;
         lastBlockOpenTime = System.currentTimeMillis();
         stream = setupStream(block);
-      }
-      else {
+      } else {
         consecutiveBlock = true;
       }
       readBlock(block);
@@ -216,15 +225,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
         blocksMetadataOutput.emit(block);
       }
       blocksPerWindow++;
-    }
-    catch (IOException ie) {
+    } catch (IOException ie) {
       try {
         if (lastProcessedBlock != null) {
           teardownStream(lastProcessedBlock);
           lastProcessedBlock = null;
         }
-      }
-      catch (IOException ioe) {
+      } catch (IOException ioe) {
         throw new RuntimeException("closing last", ie);
       }
       throw new RuntimeException(ie);
@@ -250,7 +257,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
       //If the record is partial then ignore the record.
       if (record != null) {
         counters.getCounter(ReaderCounterKeys.RECORDS).increment();
-        messages.emit(new ReaderRecord<R>(blockMetadata.getBlockId(), record));
+        messages.emit(new ReaderRecord<>(blockMetadata.getBlockId(), record));
       }
     }
   }
@@ -261,7 +268,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
    */
   @SuppressWarnings("unchecked")
   @Override
-  public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context)
+  public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(
+      Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context)
   {
     if (partitions.iterator().next().getStats() == null) {
       //First time when define partitions is called
@@ -271,7 +279,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
 
     //Create new partitions
     for (Partition<AbstractBlockReader<R, B, STREAM>> partition : partitions) {
-      newPartitions.add(new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(partition.getPartitionedInstance()));
+      newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
     }
     partitions.clear();
     int morePartitionsToCreate = partitionCount - newPartitions.size();
@@ -287,8 +295,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
         LOG.debug("partition removed {}", toRemove.getPartitionedInstance().operatorId);
         partitionIterator.remove();
       }
-    }
-    else {
+    } else {
       //Add more partitions
       Kryo kryo = new Kryo();
       while (morePartitionsToCreate-- > 0) {
@@ -301,7 +308,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
         @SuppressWarnings("unchecked")
         AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass());
 
-        DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(blockReader);
+        DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(
+            blockReader);
         newPartitions.add(partition);
       }
     }
@@ -389,12 +397,10 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
     if (totalBacklog > maxReaders) {
       LOG.debug("large backlog {}", totalBacklog);
       newPartitionCount = maxReaders;
-    }
-    else if (totalBacklog < minReaders) {
+    } else if (totalBacklog < minReaders) {
       LOG.debug("small backlog {}", totalBacklog);
       newPartitionCount = minReaders;
-    }
-    else {
+    } else {
       newPartitionCount = getAdjustedCount(totalBacklog);
       LOG.debug("moderate backlog {}", totalBacklog);
     }
@@ -574,7 +580,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
 
   }
 
-  public static enum ReaderCounterKeys
+  public enum ReaderCounterKeys
   {
     RECORDS, BLOCKS, BYTES, TIME
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
index 006df45..d74c9c9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
@@ -36,7 +36,8 @@ import com.datatorrent.api.StatsListener;
  * @since 2.1.0
  */
 @StatsListener.DataQueueSize
-public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream>
+public abstract class AbstractFSBlockReader<R>
+    extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream>
 {
   protected transient FileSystem fs;
   protected transient Configuration configuration;
@@ -48,8 +49,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
     configuration = new Configuration();
     try {
       fs = getFSInstance();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("creating fs", e);
     }
   }
@@ -60,8 +60,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
     super.teardown();
     try {
       fs.close();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
@@ -88,27 +87,28 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl
    *
    * @param <R> type of records
    */
-  public static abstract class AbstractFSLineReader<R> extends AbstractFSBlockReader<R>
+  public abstract static class AbstractFSLineReader<R> extends AbstractFSBlockReader<R>
   {
 
     public AbstractFSLineReader()
     {
       super();
-      this.readerContext = new ReaderContext.LineReaderContext<FSDataInputStream>();
+      this.readerContext = new ReaderContext.LineReaderContext<>();
     }
   }
 
   /**
-   * An {@link AbstractFSBlockReader} which reads lines from the block using {@link ReaderContext.ReadAheadLineReaderContext}
+   * An {@link AbstractFSBlockReader} which reads lines from the block using
+   * {@link ReaderContext.ReadAheadLineReaderContext}
    *
-   * @param <R>
+   * @param <R> type of record.
    */
-  public static abstract class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R>
+  public abstract static class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R>
   {
     public AbstractFSReadAheadLineReader()
     {
       super();
-      this.readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+      this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 24e9623..534024d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -54,7 +54,7 @@ public interface BlockMetadata
    */
   long getPreviousBlockId();
 
-  public abstract class AbstractBlockMetadata implements BlockMetadata
+  abstract class AbstractBlockMetadata implements BlockMetadata
   {
     private long offset;
     private long length;
@@ -198,7 +198,7 @@ public interface BlockMetadata
   /**
    * A block of file which contains file path adn other block properties.
    */
-  public static class FileBlockMetadata extends AbstractBlockMetadata
+  class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
 
@@ -208,7 +208,8 @@ public interface BlockMetadata
       filePath = null;
     }
 
-    public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
+    public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock,
+        long previousBlockId)
     {
       super(blockId, offset, length, isLastBlock, previousBlockId);
       this.filePath = filePath;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
index 948fc3e..ad55358 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
@@ -18,10 +18,7 @@
  */
 package com.datatorrent.lib.io.block;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-
 import com.datatorrent.api.StatsListener;
-
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -38,7 +35,7 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice>
   public FSSliceReader()
   {
     super();
-    this.readerContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+    this.readerContext = new ReaderContext.FixedBytesReaderContext<>();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index b69b1ee..1ee6c53 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -22,16 +22,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.PositionedReadable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PositionedReadable;
+
 /**
  * This controls how an {@link AbstractBlockReader} reads a {@link BlockMetadata}.
  *
  * @param <STREAM> type of stream
- *
  * @since 2.1.0
  */
 public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
@@ -59,7 +59,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
    * Represents the total bytes used to construct the record.<br/>
    * Used bytes can be different from the bytes in the record.
    */
-  public static class Entity
+  class Entity
   {
     private byte[] record;
     private long usedBytes;
@@ -97,9 +97,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
    *
    * @param <STREAM> type of stream.
    */
-  public static abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM>
+  abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM>
   {
-
     protected transient long offset;
     protected transient STREAM stream;
     protected transient BlockMetadata blockMetadata;
@@ -143,8 +142,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
    *
    * @param <STREAM> type of stream.
    */
-
-  public static class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
+  class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
   {
 
     protected int bufferSize;
@@ -178,8 +176,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
     @Override
     protected Entity readEntity() throws IOException
     {
-      //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block boundary
-      //and faced issues with duplicate records. Controlling the buffer size didn't help either.
+      //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block
+      // boundary and faced issues with duplicate records. Controlling the buffer size didn't help either.
 
       boolean foundEOL = false;
       int bytesRead = 0;
@@ -200,8 +198,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
           if (c != '\r' && c != '\n') {
             tmpBuilder.write(c);
             posInStr++;
-          }
-          else {
+          } else {
             foundEOL = true;
             break;
           }
@@ -216,14 +213,12 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
             if (c == '\r' || c == '\n') {
               emptyBuilder.write(c);
               posInStr++;
-            }
-            else {
+            } else {
               break;
             }
           }
           usedBytes += emptyBuilder.toByteArray().length;
-        }
-        else {
+        } else {
           //read more bytes from the input stream
           posInStr = 0;
         }
@@ -266,14 +261,14 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
   /**
    * Another reader context that splits the block into records on '\n' or '\r'.<br/>
    * This implementation doesn't need a way to validate the start of a record.<br/>
-   *
+   * <p/>
    * This  starts parsing the block (except the first block of the file) from the first eol character.
    * It is a less optimized version of an {@link LineReaderContext} which always reads beyond the block
    * boundary.
    *
    * @param <STREAM>
    */
-  public static class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM>
+  class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM>
   {
     @Override
     public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
@@ -285,8 +280,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
         try {
           Entity entity = readEntity();
           offset += entity.usedBytes;
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
           throw new RuntimeException("when reading first entity", e);
         }
       }
@@ -310,7 +304,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
    *
    * @param <STREAM> type of stream.
    */
-  public static class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
+  class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
   {
     //When this field is null, it is initialized to default fs block size in setup.
     protected Integer length;
@@ -319,7 +313,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
     public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
     {
       if (length == null) {
-        length = (int) new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024);
+        length = (int)new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024);
         LOG.debug("length init {}", length);
       }
       super.initialize(stream, blockMetadata, consecutiveBlock);
@@ -331,7 +325,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
       entity.clear();
       int bytesToRead = length;
       if (offset + length >= blockMetadata.getLength()) {
-        bytesToRead = (int) (blockMetadata.getLength() - offset);
+        bytesToRead = (int)(blockMetadata.getLength() - offset);
       }
       byte[] record = new byte[bytesToRead];
       stream.readFully(offset, record, 0, bytesToRead);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
index 14cfd47..bc89b6e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package com.datatorrent.lib.io.block;
\ No newline at end of file
+package com.datatorrent.lib.io.block;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
index 3610191..b2b99e6 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java
@@ -21,21 +21,21 @@ package com.datatorrent.lib.io.block;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.fs.FSDataInputStream;
+
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
-
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.lib.counters.BasicCounters;
 import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.netlet.util.Slice;
 
 /**
  * Stats and partitioning tests for {@link AbstractBlockReader}
@@ -105,19 +105,23 @@ public class AbstractBlockReaderTest
     Assert.assertTrue("partition needed", response.repartitionRequired);
     Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount());
 
-    List<Partitioner.Partition<AbstractBlockReader<Slice,
-      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+    List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+        partitions = Lists
+        .newArrayList();
 
-    DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
-      new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+    DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new
+        DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
+        sliceReader);
 
-    TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion =
-      new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats);
+    TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+        pseudoParttion = new TestUtils.MockPartition<>(
+        apartition, readerStats);
 
     partitions.add(pseudoParttion);
 
-    Collection<Partitioner.Partition<AbstractBlockReader<Slice,
-      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+    Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+        newPartitions = sliceReader
+        .definePartitions(partitions, null);
     Assert.assertEquals(8, newPartitions.size());
   }
 
@@ -131,39 +135,43 @@ public class AbstractBlockReaderTest
     TestReader sliceReader = new TestReader();
     sliceReader.processStats(readerStats);
 
-    List<Partitioner.Partition<AbstractBlockReader<Slice,
-      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList();
+    List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+        partitions = Lists
+        .newArrayList();
 
-    DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition =
-      new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader);
+    DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new
+        DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
+        sliceReader);
 
-    TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion =
-      new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats);
+    TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>
+        pseudoParttion = new TestUtils.MockPartition<>(apartition, readerStats);
 
     partitions.add(pseudoParttion);
 
-    Collection<Partitioner.Partition<AbstractBlockReader<Slice,
-      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null);
+    Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+        newPartitions = sliceReader.definePartitions(partitions, null);
+
+    List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>>
+        newMocks = Lists.newArrayList();
 
-    List<Partitioner.Partition<AbstractBlockReader<Slice,
-      BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newMocks = Lists.newArrayList();
+    for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition
+        : newPartitions) {
 
-    for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition :
-      newPartitions) {
-      partition.getPartitionedInstance().counters.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1));
+      partition.getPartitionedInstance().counters
+          .setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1));
 
-      newMocks.add(
-        new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(
-          (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>) partition,
-          readerStats)
-      );
+      newMocks.add(new TestUtils.MockPartition<>(
+          (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>)partition,
+          readerStats));
     }
     sliceReader.partitionCount = 1;
     newPartitions = sliceReader.definePartitions(newMocks, null);
     Assert.assertEquals(1, newPartitions.size());
 
-    AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator().next().getPartitionedInstance();
-    Assert.assertEquals("num blocks", 8, last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue());
+    AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator()
+        .next().getPartitionedInstance();
+    Assert.assertEquals("num blocks", 8,
+        last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue());
   }
 
   static class ReaderStats extends Stats.OperatorStats
@@ -171,7 +179,7 @@ public class AbstractBlockReaderTest
 
     ReaderStats(int backlog, long readBlocks, long bytes, long time)
     {
-      BasicCounters<MutableLong> bc = new BasicCounters<MutableLong>(MutableLong.class);
+      BasicCounters<MutableLong> bc = new BasicCounters<>(MutableLong.class);
       bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(readBlocks));
       bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BYTES, new MutableLong(bytes));
       bc.setCounter(AbstractBlockReader.ReaderCounterKeys.TIME, new MutableLong(time));
@@ -191,4 +199,4 @@ public class AbstractBlockReaderTest
       return partitionCount;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
index 32ac536..5ddc8a9 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
@@ -18,7 +18,11 @@
  */
 package com.datatorrent.lib.io.block;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -36,7 +40,6 @@ import com.google.common.collect.Lists;
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
-
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -74,10 +77,10 @@ public class FSLineReaderTest
 
       blockReader.setup(readerContext);
 
-      messageSink = new CollectorTestSink<Object>();
+      messageSink = new CollectorTestSink<>();
       blockReader.messages.setSink(messageSink);
 
-      blockMetadataSink = new CollectorTestSink<Object>();
+      blockMetadataSink = new CollectorTestSink<>();
       blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
 
       BufferedReader reader;
@@ -88,11 +91,7 @@ public class FSLineReaderTest
           messages.add(line.split(","));
         }
         reader.close();
-      }
-      catch (FileNotFoundException e) {
-        throw new RuntimeException(e);
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -111,8 +110,9 @@ public class FSLineReaderTest
   public void test()
   {
 
-    BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L, 0L, testMeta.dataFile.length(),
-      true, -1);
+    BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L,
+        0L, testMeta.dataFile.length(),
+        true, -1);
 
     testMeta.blockReader.beginWindow(1);
     testMeta.blockReader.blocksMetadataInput.process(block);
@@ -123,7 +123,7 @@ public class FSLineReaderTest
 
     for (int i = 0; i < messages.size(); i++) {
       @SuppressWarnings("unchecked")
-      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
       Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
     }
   }
@@ -132,13 +132,16 @@ public class FSLineReaderTest
   public void testMultipleBlocks()
   {
     long blockSize = 1000;
-    int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+        0 :
+        1));
 
     testMeta.blockReader.beginWindow(1);
 
     for (int i = 0; i < noOfBlocks; i++) {
-      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
-        i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1);
+      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+          testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
+          i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1);
       testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
     }
 
@@ -148,7 +151,7 @@ public class FSLineReaderTest
     Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
     for (int i = 0; i < messages.size(); i++) {
       @SuppressWarnings("unchecked")
-      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
       Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
     }
   }
@@ -157,13 +160,17 @@ public class FSLineReaderTest
   public void testNonConsecutiveBlocks()
   {
     long blockSize = 1000;
-    int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+        0 :
+        1));
 
     testMeta.blockReader.beginWindow(1);
 
     for (int i = 0; i < noOfBlocks; i++) {
-      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i,
-        i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, -1);
+      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+          testMeta.dataFile.getAbsolutePath(), i,
+          i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+          -1);
       testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
     }
 
@@ -173,7 +180,7 @@ public class FSLineReaderTest
     Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
     for (int i = 0; i < messages.size(); i++) {
       @SuppressWarnings("unchecked")
-      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i);
+      AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
       Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i)));
     }
   }
@@ -193,4 +200,4 @@ public class FSLineReaderTest
 
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
index 23aa1af..37222a7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
@@ -23,20 +23,20 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.io.FileUtils;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
-
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
 
 /**
  * Tests for {@link FSSliceReader}.
@@ -63,8 +63,7 @@ public class FSSliceReaderTest
       output = "target/" + description.getClassName() + "/" + description.getMethodName();
       try {
         FileUtils.forceMkdir(new File(output));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
       dataFile = new File("src/test/resources/reader_test_data.csv");
@@ -77,10 +76,10 @@ public class FSSliceReaderTest
 
       blockReader.setup(readerContext);
 
-      messageSink = new CollectorTestSink<Object>();
+      messageSink = new CollectorTestSink<>();
       blockReader.messages.setSink(messageSink);
 
-      blockMetadataSink = new CollectorTestSink<Object>();
+      blockMetadataSink = new CollectorTestSink<>();
       blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
     }
 
@@ -90,8 +89,7 @@ public class FSSliceReaderTest
       blockReader.teardown();
       try {
         FileUtils.forceDelete(new File("target/" + description.getClassName()));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -104,14 +102,16 @@ public class FSSliceReaderTest
   public void testBytesReceived() throws IOException
   {
     long blockSize = 1500;
-    int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ?
+        0 : 1));
 
     testMeta.blockReader.beginWindow(1);
 
     for (int i = 0; i < noOfBlocks; i++) {
-      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
-        i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize,
-        i == noOfBlocks - 1, i - 1);
+      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+          testMeta.dataFile.getAbsolutePath(), i, i * blockSize,
+          i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize,
+          i == noOfBlocks - 1, i - 1);
       testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
     }
 
@@ -125,7 +125,7 @@ public class FSSliceReaderTest
 
     for (Object message : messages) {
       @SuppressWarnings("unchecked")
-      AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>) message;
+      AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>)message;
       totatBytesReceived += msg.getRecord().length;
       outputStream.write(msg.getRecord().buffer);
     }
@@ -135,4 +135,4 @@ public class FSSliceReaderTest
     FileUtils.contentEquals(testMeta.dataFile, outputFile);
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
index 056e7b2..5e0aa22 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java
@@ -34,4 +34,4 @@ public class ReadAheadLineReaderTest extends FSLineReaderTest
       return new String(bytes);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index b47315b..9e110be 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -23,18 +23,29 @@ import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.*;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.block.BlockMetadata;
@@ -218,7 +229,8 @@ public class FileSplitterBaseTest
     int count;
 
     transient CountDownLatch latch = new CountDownLatch(1);
-    public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>()
+    public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new
+        DefaultInputPort<FileSplitterInput.FileMetadata>()
     {
       @Override
       public void process(FileSplitterInput.FileMetadata fileMetadata)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
index 09e3b9e..e62f643 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
@@ -25,10 +25,6 @@ import java.util.Set;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Ignore;
@@ -39,11 +35,15 @@ import org.junit.runner.Description;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
-
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.block.BlockMetadata;
@@ -59,9 +59,9 @@ public class FileSplitterTest
     protected void finished(Description description)
     {
       try {
-        FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
-      }
-      catch (IOException e) {
+        FileContext.getLocalFSFileContext()
+            .delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -77,7 +77,7 @@ public class FileSplitterTest
     Set<String> filePaths = Sets.newHashSet();
     Context.OperatorContext context;
 
-    Exchanger<Integer> exchanger = new Exchanger<Integer>();
+    Exchanger<Integer> exchanger = new Exchanger<>();
 
     @Override
     protected void starting(org.junit.runner.Description description)
@@ -100,8 +100,7 @@ public class FileSplitterTest
           filePaths.add(new Path(this.dataDirectory, created.getName()).toUri().toString());
           FileUtils.write(created, StringUtils.join(lines, '\n'));
         }
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
 
@@ -118,10 +117,10 @@ public class FileSplitterTest
       context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
       fileSplitter.setup(context);
 
-      fileMetadataSink = new CollectorTestSink<FileSplitter.FileMetadata>();
+      fileMetadataSink = new CollectorTestSink<>();
       TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink);
 
-      blockMetadataSink = new CollectorTestSink<BlockMetadata.FileBlockMetadata>();
+      blockMetadataSink = new CollectorTestSink<>();
       TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink);
     }
 
@@ -148,7 +147,7 @@ public class FileSplitterTest
     testMeta.fileSplitter.endWindow();
     Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size());
     for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
-      FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata) fileMetadata;
+      FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata)fileMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
       Assert.assertNotNull("name: ", metadata.getFileName());
     }
@@ -165,7 +164,7 @@ public class FileSplitterTest
     testMeta.fileSplitter.emitTuples();
     Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
     for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
-      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata;
+      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
     }
   }
@@ -184,7 +183,7 @@ public class FileSplitterTest
     for (int i = 0; i < 12; i++) {
       FileSplitter.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i);
       File testFile = new File(testMeta.dataDirectory, fm.getFileName());
-      noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0));
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
     }
     Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
   }
@@ -193,7 +192,7 @@ public class FileSplitterTest
   public void testIdempotency() throws InterruptedException
   {
     IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager =
-      new IdempotentStorageManager.FSIdempotentStorageManager();
+        new IdempotentStorageManager.FSIdempotentStorageManager();
     testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
 
     testMeta.fileSplitter.setup(testMeta.context);
@@ -206,7 +205,7 @@ public class FileSplitterTest
     testMeta.fileSplitter.beginWindow(1);
     Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
     for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
-      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata;
+      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
     }
   }
@@ -271,7 +270,7 @@ public class FileSplitterTest
     int noOfBlocks = 0;
     for (int i = 0; i < 12; i++) {
       File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
-      noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0));
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
     }
 
     testMeta.fileSplitter.setBlockSize(2L);
@@ -297,7 +296,8 @@ public class FileSplitterTest
   @Test
   public void testIdempotencyWithBlocksThreshold() throws InterruptedException
   {
-    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager
+        .FSIdempotentStorageManager();
     testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
     testMeta.fileSplitter.setBlocksThreshold(10);
     testMeta.fileSplitter.scanner.setScanIntervalMillis(500);
@@ -346,7 +346,8 @@ public class FileSplitterTest
   @Ignore
   public void testRecoveryOfPartialFile() throws InterruptedException
   {
-    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager
+        .FSIdempotentStorageManager();
     fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + "recovery");
     testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager);
     testMeta.fileSplitter.setBlockSize(2L);
@@ -395,8 +396,10 @@ public class FileSplitterTest
 
     String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
 
-    Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
-    Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
+    Assert.assertTrue("Block file name 0",
+        testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
+    Assert.assertTrue("Block file name 1",
+        testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
   }
 
   @Test
@@ -442,7 +445,7 @@ public class FileSplitterTest
     testMeta.fileSplitter.endWindow();
     Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(),
-      testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
+        testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
   }
 
   private static class MockScanner extends FileSplitter.TimeBasedDirectoryScanner
@@ -463,8 +466,7 @@ public class FileSplitterTest
           LOG.debug("discovered {}", discoveredFiles.size());
           testMeta.exchanger.exchange(discoveredFiles.size());
         }
-      }
-      catch (InterruptedException e) {
+      } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
     }


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1910' into devel-3

Posted by ti...@apache.org.
Merge branch 'MLHR-1910' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1bfd2d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1bfd2d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1bfd2d15

Branch: refs/heads/devel-3
Commit: 1bfd2d15a1cde24ea5a5fc96034c45df258dbb1d
Parents: 7803359 17d0576
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 17 11:46:03 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 17 11:46:03 2015 -0800

----------------------------------------------------------------------
 .../lib/io/block/AbstractBlockReader.java       | 90 +++++++++++---------
 .../lib/io/block/AbstractFSBlockReader.java     | 22 ++---
 .../datatorrent/lib/io/block/BlockMetadata.java |  7 +-
 .../datatorrent/lib/io/block/FSSliceReader.java |  5 +-
 .../datatorrent/lib/io/block/ReaderContext.java | 40 ++++-----
 .../datatorrent/lib/io/block/package-info.java  |  2 +-
 .../lib/io/block/AbstractBlockReaderTest.java   | 76 +++++++++--------
 .../lib/io/block/FSLineReaderTest.java          | 49 ++++++-----
 .../lib/io/block/FSSliceReaderTest.java         | 30 +++----
 .../lib/io/block/ReadAheadLineReaderTest.java   |  2 +-
 .../lib/io/fs/FileSplitterBaseTest.java         | 26 ++++--
 .../datatorrent/lib/io/fs/FileSplitterTest.java | 54 ++++++------
 12 files changed, 215 insertions(+), 188 deletions(-)
----------------------------------------------------------------------