You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/18 06:54:19 UTC

[01/18] incubator-apex-malhar git commit: APEXMALHAR-2003 NPE in blockMetaDataIterator after recovery

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 fd2f42bd9 -> 5373a3cb6


APEXMALHAR-2003 NPE in blockMetaDataIterator after recovery


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/289dad74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/289dad74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/289dad74

Branch: refs/heads/devel-3
Commit: 289dad7426ade1779733cead614d52946154211f
Parents: d23e283
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Feb 23 17:46:10 2016 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Feb 24 18:54:05 2016 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         |  6 +--
 .../lib/io/fs/FileSplitterInputTest.java        | 44 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index afd86cf..cd47d48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -285,7 +285,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
     private long pos;
     private int blockNumber;
 
-    private final transient AbstractFileSplitter splitter;
+    private final AbstractFileSplitter splitter;
 
     protected BlockMetadataIterator()
     {
@@ -319,8 +319,8 @@ public abstract class AbstractFileSplitter extends BaseOperator
       }
       boolean isLast = length >= fileMetadata.getFileLength();
       long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length;
-      BlockMetadata.FileBlockMetadata fileBlock = splitter
-          .buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
+      BlockMetadata.FileBlockMetadata fileBlock = splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber,
+          fileMetadata, isLast);
       pos = lengthOfFileInBlock;
       return fileBlock;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 2f605eb..cd0de2d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -51,6 +51,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.block.BlockMetadata;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.lib.util.TestUtils;
 
 /**
@@ -455,6 +456,49 @@ public class FileSplitterInputTest
         testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
   }
 
+  @Test
+  public void testRecoveryOfBlockMetadataIterator() throws InterruptedException
+  {
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager =
+        new IdempotentStorageManager.FSIdempotentStorageManager();
+
+    testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+    testMeta.fileSplitterInput.setBlockSize(2L);
+    testMeta.fileSplitterInput.setBlocksThreshold(2);
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted.
+    Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    //At this point the operator was check-pointed and then there was a failure.
+    testMeta.fileSplitterInput.teardown();
+
+    //The operator was restored from persisted state and re-deployed.
+    testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
+    TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink);
+    TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink);
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+
   private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
   {
     transient Semaphore semaphore;


[17/18] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2008-hdfs-input-module' of https://github.com/DT-Priyanka/incubator-apex-malhar

Posted by th...@apache.org.
Merge branch 'APEXMALHAR-2008-hdfs-input-module' of https://github.com/DT-Priyanka/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/becee7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/becee7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/becee7f8

Branch: refs/heads/devel-3
Commit: becee7f82dbca5d975d92dc45ba5f771f8682ea8
Parents: 51a19e1 f9fe3d5
Author: ishark <is...@datatorrent.com>
Authored: Wed Mar 16 15:30:26 2016 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Wed Mar 16 15:30:26 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  31 ++-
 .../datatorrent/lib/io/block/BlockReader.java   |  66 +++++
 .../lib/io/fs/AbstractFileSplitter.java         |  45 +++-
 .../lib/io/fs/FileSplitterInput.java            |  81 +++++--
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
 .../datatorrent/lib/io/fs/HDFSInputModule.java  | 243 +++++++++++++++++++
 .../lib/io/fs/FileSplitterInputTest.java        |   2 +-
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 +++++++++++++++++
 8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[15/18] incubator-apex-malhar git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-apex-malhar

Posted by th...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/51a19e1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/51a19e1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/51a19e1b

Branch: refs/heads/devel-3
Commit: 51a19e1bea5ebe630161b990fe3df75e3ac017c9
Parents: 23a1b10 da6bf54
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Mar 15 18:06:44 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Mar 15 18:06:44 2016 -0700

----------------------------------------------------------------------
 .../db/jdbc/JDBCDimensionalOutputOperator.java  | 464 +++++++++++++++++++
 1 file changed, 464 insertions(+)
----------------------------------------------------------------------



[06/18] incubator-apex-malhar git commit: APEXMALHAR-2004: Add file's modification time in referenceTimes map instead of parent's

Posted by th...@apache.org.
APEXMALHAR-2004: Add file's modification time in referenceTimes map
instead of parent's


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/327a3999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/327a3999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/327a3999

Branch: refs/heads/devel-3
Commit: 327a3999c6c443894bc1e085d91a1f030a848bdd
Parents: d3a7063
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Sat Feb 27 18:34:38 2016 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Mar 11 14:38:34 2016 +0530

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            |  8 ++--
 .../lib/io/fs/FileSplitterInputTest.java        | 47 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..234650d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -434,17 +434,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     }
 
     protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath,
-        @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
+        FileStatus childStatus, Path rootPath)
     {
       ScannedFileInfo info;
       if (rootPath == null) {
         info = parentStatus.isDirectory() ?
-          new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) :
-          new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime());
+          new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), childStatus.getModificationTime()) :
+          new ScannedFileInfo(null, childPath.toUri().getPath(), childStatus.getModificationTime());
       } else {
         URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
         info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
-            parentStatus.getModificationTime());
+          childStatus.getModificationTime());
       }
       return info;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..c5d2ae7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.io.fs;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -498,6 +499,52 @@ public class FileSplitterInputTest
     Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
   }
 
+  @Test
+  public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException
+  {
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    Thread.sleep(1000);
+    //change a file , this will not change mtime of the file.
+    File f12 = new File(testMeta.dataDirectory, "file11" + ".txt");
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 2; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    /* Need to use FileWriter, FileUtils changes the directory timestamp when
+       file is changed. */
+    FileWriter fout = new FileWriter(f12, true);
+    fout.write(StringUtils.join(lines, '\n').toCharArray());
+    fout.close();
+    testMeta.fileSplitterInput.getScanner().setTrigger(true);
+
+    //window 2
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+
+    //window 3
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+    testMeta.scanner.setTrigger(true);
+    testMeta.scanner.semaphore.release();
+    testMeta.fileSplitterInput.beginWindow(3);
+    Thread.sleep(1000);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size());
+
+  }
 
   private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
   {


[07/18] incubator-apex-malhar git commit: Documentation for FileSplitter, BlockReader and FileOutput operators.

Posted by th...@apache.org.
Documentation for FileSplitter, BlockReader and FileOutput operators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/afbcfc21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/afbcfc21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/afbcfc21

Branch: refs/heads/devel-3
Commit: afbcfc21beb5c25735c7ef64adad302afbbc4ef3
Parents: 7b1a757
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Mon Nov 9 18:48:45 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Mar 11 19:22:33 2016 -0800

----------------------------------------------------------------------
 docs/operators/block_reader.md                  | 226 +++++++++++++++++++
 docs/operators/file_output.md                   | 180 +++++++++++++++
 docs/operators/file_splitter.md                 | 163 +++++++++++++
 .../images/blockreader/classdiagram.png         | Bin 0 -> 48613 bytes
 .../images/blockreader/flowdiagram.png          | Bin 0 -> 48160 bytes
 .../images/blockreader/fsreaderexample.png      | Bin 0 -> 29927 bytes
 .../blockreader/totalBacklogProcessing.png      | Bin 0 -> 55944 bytes
 .../images/fileoutput/FileRotation.png          | Bin 0 -> 26067 bytes
 docs/operators/images/fileoutput/diagram1.png   | Bin 0 -> 30754 bytes
 .../images/filesplitter/baseexample.png         | Bin 0 -> 14493 bytes
 .../images/filesplitter/classdiagram.png        | Bin 0 -> 14513 bytes
 .../images/filesplitter/inputexample.png        | Bin 0 -> 16012 bytes
 docs/operators/images/filesplitter/sequence.png | Bin 0 -> 17020 bytes
 13 files changed, 569 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/block_reader.md
----------------------------------------------------------------------
diff --git a/docs/operators/block_reader.md b/docs/operators/block_reader.md
new file mode 100644
index 0000000..9b7628a
--- /dev/null
+++ b/docs/operators/block_reader.md
@@ -0,0 +1,226 @@
+Block Reader
+=============
+
+This is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block. 
+
+## Why is it needed?
+
+A Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see [AbstractFileInputOperator](https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java)) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.
+
+## Class Diagram
+
+![BlockReader class diagram](images/blockreader/classdiagram.png)
+
+## AbstractBlockReader
+This is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below describes the processing of a block metadata.
+
+![BlockReader flow diagram](images/blockreader/flowdiagram.png)
+
+### Ports
+
+- blocksMetadataInput: input port on which block metadata are received.
+
+- blocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.
+
+- messages: output port on which tuples of type `com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord` are emitted. This class encapsulates a `record` and the `blockId` of the corresponding block.
+
+### readerContext
+
+This is one of the most important fields in the block reader. It is of type `com.datatorrent.lib.io.block.ReaderContext` and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include bytes for the record delimiter which may not be a part of the actual record.
+ 
+Once the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking `readerContext.initialize(stream, blockMetadata, consecutiveBlock);`. Initialize method is where any implementation of `ReaderContext` can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.
+
+Once the initialization is done, `readerContext.next()` is called repeatedly until it returns `null`. It is left to the `ReaderContext` implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- `LineReaderContext` and `ReadAheadLineReaderContext`). In other cases when there isn't a possibility of split record (example- `FixedBytesReaderContext`), it returns `null` immediately when the block boundary is reached. The return type of `readerContext.next()` is of type `com.datatorrent.lib.io.block.ReaderContext.Entity` which is just a wrapper for a `byte[]` that represents the record and total bytes used in fetching the record.
+
+### Abstract methods
+
+- `STREAM setupStream(B block)`: creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.
+
+- `R convertToRecord(byte[] bytes)`<a name="convertToRecord"></a>: this converts the array of bytes into the actual instance of record type.
+
+### Auto-scalability
+
+Block reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the `blocksMetadataInput` port queue of all partitions) it can create more partitions or reduce them. Details are discussed in the last section which covers the [partitioner and stats-listener](#partitioning).
+
+### Configuration
+
+1.  <a name="maxReaders"></a>**maxReaders**: when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.
+2. <a name="minReaders"></a>**minReaders**: when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.
+3. <a name="collectStats"></a>**collectStats**: this enables or disables auto-scaling. When it is set to `true` the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.
+4. **intervalMillis**: when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.
+
+## <a name="AbstractFSBlockReader"></a> AbstractFSBlockReader
+
+This abstract implementation deals with files. Different types of file systems that are implementations of `org.apache.hadoop.fs.FileSystem` are supported. The user can override `getFSInstance()` method to create an instance of a specific `FileSystem`. By default, filesystem instance is created from the filesytem URI that comes from the default hadoop configuration.
+
+```java
+protected FileSystem getFSInstance() throws IOException
+{
+  return FileSystem.newInstance(configuration);
+}
+```
+It uses this filesystem instance to setup a stream of type `org.apache.hadoop.fs.FSDataInputStream` to read the block.
+
+```java
+@Override
+protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+{
+  return fs.open(new Path(block.getFilePath()));
+}
+```
+All the ports and configurations are derived from the super class. It doesn't provide an implementation of [`convertToRecord(byte[] bytes)`](#convertToRecord) method which is delegated to concrete sub-classes.
+
+### Example Application
+This simple dag demonstrates how any concrete implementation of `AbstractFSBlockReader` can be plugged into an application. 
+
+![Application with FSBlockReader](images/blockreader/fsreaderexample.png)
+
+In the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.
+
+```java
+public class ExampleApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput input = dag.addOperator("File-splitter", new FileSplitterInput());
+    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.
+    LineReader blockReader = dag.addOperator("Block-reader", new LineReader());
+    Filter filter = dag.addOperator("Filter", new Filter());
+    RecordOutputOperator recordOutputOperator = dag.addOperator("Record-writer", new RecordOutputOperator());
+
+    dag.addStream("file-block metadata", input.blocksMetadataOutput, blockReader.blocksMetadataInput);
+    dag.addStream("records", blockReader.messages, filter.input);
+    dag.addStream("filtered-records", filter.output, recordOutputOperator.input);
+  }
+
+  /**
+   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.
+   */
+  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader<String>
+  {
+
+    @Override
+    protected String convertToRecord(byte[] bytes)
+    {
+      return new String(bytes);
+    }
+  }
+
+  /**
+   * Considers any line starting with a '.' as invalid. Emits the valid records.
+   */
+  public static class Filter extends BaseOperator
+  {
+    public final transient DefaultOutputPort<AbstractBlockReader.ReaderRecord<String>> output = new DefaultOutputPort<>();
+    public final transient DefaultInputPort<AbstractBlockReader.ReaderRecord<String>> input = new DefaultInputPort<AbstractBlockReader.ReaderRecord<String>>()
+    {
+      @Override
+      public void process(AbstractBlockReader.ReaderRecord<String> stringRecord)
+      {
+        //filter records and transform
+        //if the string starts with a '.' ignore the string.
+        if (!StringUtils.startsWith(stringRecord.getRecord(), ".")) {
+          output.emit(stringRecord);
+        }
+      }
+    };
+  }
+
+  /**
+   * Persists the valid records to corresponding block files.
+   */
+  public static class RecordOutputOperator extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<String>>
+  {
+    @Override
+    protected String getFileName(AbstractBlockReader.ReaderRecord<String> tuple)
+    {
+      return Long.toHexString(tuple.getBlockId());
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<String> tuple)
+    {
+      return tuple.getRecord().getBytes();
+    }
+  }
+}
+```
+Configuration to parallel partition block reader with its downstream operators.
+
+```xml
+  <property>
+    <name>dt.operator.Filter.port.input.attr.PARTITION_PARALLEL</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL</name>
+    <value>true</value>
+  </property>
+```
+
+## AbstractFSReadAheadLineReader
+
+This extension of [`AbstractFSBlockReader`](#AbstractFSBlockReader) parses lines from a block and binds the `readerContext` field to an instance of `ReaderContext.ReadAheadLineReaderContext`.
+
+It is abstract because it doesn't provide an implementation of [`convertToRecord(byte[] bytes)`](#convertToRecord) since the user may want to convert the bytes that make a line into some other type. 
+
+### ReadAheadLineReaderContext
+
+In order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.
+
+This is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.
+
+## AbstractFSLineReader
+
+Similar to `AbstractFSReadAheadLineReader`, even this parses lines from a block. However, it binds the `readerContext` field to an instance of `ReaderContext.LineReaderContext`.
+
+### LineReaderContext
+
+This handles the line split differently from `ReadAheadLineReaderContext`. It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsistency in reading the next block.
+
+When the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.
+
+If the validations of completeness fails for a line then [`convertToRecord(byte[] bytes)`](#convertToRecord) should return null.
+
+## FSSliceReader
+
+A concrete extension of [`AbstractFSBlockReader`](#AbstractFSBlockReader) that reads fixed-size `byte[]` from a block and emits the byte array wrapped in `com.datatorrent.netlet.util.Slice`.
+
+This operator binds the `readerContext` to an instance of `ReaderContext.FixedBytesReaderContext`.
+
+### FixedBytesReaderContext
+
+This implementation of `ReaderContext` never reads beyond a block boundary which can result in the last `byte[]` of a block to be of a shorter length than the rest of the records.
+
+### Configuration
+
+**readerContext.length**: length of each record. By default, this is initialized to the default hdfs block size.
+
+## Partitioner and StatsListener
+
+The logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - `PARTITIONER`) as well as a StatsListener. This is because the 
+`AbstractBlockReader` implements both the `com.datatorrent.api.Partitioner` and `com.datatorrent.api.StatsListener` interfaces and provides an implementation of `definePartitions(...)` and `processStats(...)` which make it auto-scalable.
+
+### processStats <a name="processStats"></a>
+
+The application master invokes `Response processStats(BatchedOperatorStats stats)` method on the logical instance with the stats (`tuplesProcessedPSMA`, `tuplesEmittedPSMA`, `latencyMA`, etc.) of each partition. The data which this operator is interested in is the `queueSize` of the input port `blocksMetadataInput`.
+
+Usually the `queueSize` of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with `@DataQueueSize`. In this case `AbstractBlockReader` itself is the `StatsListener` which is why it is annotated with `@DataQueueSize`.
+
+The logical instance caches the queue size per partition and at regular intervals (configured by `intervalMillis`) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.
+
+![Processing of total-backlog](images/blockreader/totalBacklogProcessing.png)
+
+The goal of this logic is to create as many partitions within bounds (see [`maxReaders`](#maxReaders) and [`minReaders`](#minReaders) above) to quickly reduce this backlog or if the backlog is small then remove any idle partitions.
+
+### definePartitions
+
+Based on the `repartitionRequired` field of the `Response` object which is returned by *[processStats](#processStats)* method, the application master invokes 
+
+```java
+Collection<Partition<AbstractBlockReader<...>>> definePartitions(Collection<Partition<AbstractBlockReader<...>>> partitions, PartitioningContext context)
+```
+on the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. 
+
+Please note auto-scaling can be disabled by setting [`collectStats`](#collectStats) to `false`. If the use-case requires only static partitioning, then that can be achieved by setting [`StatelessPartitioner`](https://github.com/chandnisingh/incubator-apex-core/blob/master/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java) as the operator attribute- `PARTITIONER` on the block reader.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/file_output.md
----------------------------------------------------------------------
diff --git a/docs/operators/file_output.md b/docs/operators/file_output.md
new file mode 100644
index 0000000..81f9482
--- /dev/null
+++ b/docs/operators/file_output.md
@@ -0,0 +1,180 @@
+AbstractFileOutputOperator
+===========================
+
+The abstract file output operator in Apache Apex Malhar library &mdash; [`AbstractFileOutputOperator`](https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java) writes streaming data to files. The main features of this operator are:
+
+1. Persisting data to files.
+2. Automatic rotation of files based on:  
+  a. maximum length of a file.  
+  b. time-based rotation where time is specified using a count of application windows.
+3. Fault-tolerance.
+4. Compression and encryption of data before it is persisted.
+
+In this tutorial we will cover the details of the basic structure and implementation of all the above features in `AbstractFileOutputOperator`. Configuration items related to each feature are discussed as they are introduced in the section of that feature.
+
+## Persisting data to files
+The principal function of this operator is to persist tuples to files efficiently. These files are created under a specific directory on the file system. The relevant configuration item is:
+
+**filePath**: path specifying the directory where files are written.
+
+Different types of file system that are implementations of `org.apache.hadoop.fs.FileSystem` are supported. The file system instance which is used for creating streams is constructed from the `filePath` URI.
+
+```java
+FileSystem.newInstance(new Path(filePath).toUri(), new Configuration())
+```
+
+Tuples may belong to different files therefore expensive IO operations like creating multiple output streams, flushing of data to disk, and closing streams are handled carefully.
+
+### Ports
+- `input`: the input port on which tuples to be persisted are received.
+
+### `streamsCache`
+This transient state caches output streams per file in memory. The file to which the data is appended may change with incoming tuples. It will be highly inefficient to keep re-opening streams for a file just because tuples for that file are interleaved with tuples for another file. Therefore, the operator maintains a cache of limited size with open output streams.
+
+ `streamsCache` is of type `com.google.common.cache.LoadingCache`. A `LoadingCache` has an attached `CacheLoader` which is responsible to load value of a key when the key is not present in the cache. Details are explained here- [CachesExplained](https://github.com/google/guava/wiki/CachesExplained).
+
+The operator constructs this cache in `setup(...)`. It is built with the following configuration items:
+
+- **maxOpenFiles**: maximum size of the cache. The cache evicts entries that haven't been used recently when the cache size is approaching this limit. *Default*: 100
+- **expireStreamAfterAcessMillis**: expires streams after the specified duration has passed since the stream was last accessed. *Default*: value of attribute- `OperatorContext.SPIN_MILLIS`.
+
+An important point to note here is that the guava cache does not perform cleanup and evict values asynchronously, that is, instantly after a value expires. Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.
+
+#### CacheLoader
+`streamsCache` is created with a `CacheLoader` that opens an `FSDataOutputStream` for a file which is not in the cache. The output stream is opened in either `append` or `create` mode and the basic logic to determine this is explained by the simple diagram below.
+
+![Opening an output stream](images/fileoutput/diagram1.png)
+
+This process gets complicated when fault-tolerance (writing to temporary files)  and rotation is added.
+
+Following are few configuration items used for opening the streams:
+
+- **replication**: specifies the replication factor of the output files. *Default*: `fs.getDefaultReplication(new Path(filePath))`
+- **filePermission**: specifies the permission of the output files. The permission is an octal number similar to that used by the Unix chmod command. *Default*: 0777
+
+#### RemovalListener
+A `Guava` cache also allows specification of removal listener which can perform some operation when an entry is removed from the cache. Since `streamsCache` is of limited size and also has time-based expiry enabled, it is imperative that when a stream is evicted from the cache it is closed properly. Therefore, we attach a removal listener to `streamsCache` which closes the stream when it is evicted.
+
+### `setup(OperatorContext context)`
+During setup the following main tasks are performed:
+
+1. FileSystem instance is created.
+2. The cache of streams is created.
+3. Files are recovered (see Fault-tolerance section).
+4. Stray part files are cleaned (see Automatic rotation section).
+
+### <a name="processTuple"></a>`processTuple(INPUT tuple)`
+The code snippet below highlights the basic steps of processing a tuple.
+
+```java
+protected void processTuple(INPUT tuple)
+{  
+  //which file to write to is derived from the tuple.
+  String fileName = getFileName(tuple);  
+
+  //streamsCache is queried for the output stream. If the stream is already opened then it is returned immediately otherwise the cache loader creates one.
+  FilterOutputStream fsOutput = streamsCache.get(fileName).getFilterStream();
+
+  byte[] tupleBytes = getBytesForTuple(tuple);
+
+  fsOutput.write(tupleBytes);
+}
+```
+
+### <a name="endWindow"></a>endWindow()
+It should be noted that while processing a tuple we do not flush the stream after every write. Since flushing is expensive it is done periodically for all the open streams in the operator's `endWindow()`.
+
+```java
+Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
+for (FSFilterStreamContext streamContext: openStreams.values()) {
+  ...
+  //this flushes the stream
+  streamContext.finalizeContext();
+  ...
+}
+```
+`FSFilterStreamContext` will be explained with compression and encryption.
+
+### <a name="teardown"></a>teardown()
+When any operator in a DAG fails then the application master invokes `teardown()` for that operator and its downstream operators. In `AbstractFileOutputOperator` we have a bunch of open streams in the cache and the operator (acting as HDFS client) holds leases for all the corresponding files. It is important to release these leases for clean re-deployment. Therefore, we try to close all the open streams in `teardown()`.
+
+## Automatic rotation
+
+In a streaming application where data is being continuously processed, when this output operator is used, data will be continuously written to an output file. The users may want to be able to take the data from time to time to use it, copy it out of Hadoop or do some other processing. Having all the data in a single file makes it difficult as the user needs to keep track of how much data has been read from the file each time so that the same data is not read again. Also users may already have processes and scripts in place that work with full files and not partial data from a file.
+
+To help solve these problems the operator supports creating many smaller files instead of writing to just one big file. Data is written to a file and when some condition is met the file is finalized and data is written to a new file. This is called file rotation. The user can determine when the file gets rotated. Each of these files is called a part file as they contain portion of the data.
+
+### Part filename
+
+The filename for a part file is formed by using the original file name and the part number. The part number starts from 0 and is incremented each time a new part file created. The default filename has the format, assuming origfile represents the original filename and partnum represents the part number,
+
+`origfile.partnum`
+
+This naming scheme can be changed by the user. It can be done so by overriding the following method
+
+```java
+protected String getPartFileName(String fileName, int part)
+```
+
+This method is passed the original filename and part number as arguments and should return the part filename.
+
+### Mechanisms
+
+The user has a couple of ways to specify when a file gets rotated. First is based on size and second on time. In the first case the files are limited by size and in the second they are rotated by time.
+
+#### Size Based
+
+With size based rotation the user specifies a size limit. Once the size of the currently file reaches this limit the file is rotated. The size limit can be specified by setting the following property
+
+`maxLength`
+
+Like any other property this can be set in Java application code or in the property file.
+
+#### Time Based
+
+In time based rotation user specifies a time interval. This interval is specified as number of application windows. The files are rotated periodically once the specified number of application windows have elapsed. Since the interval is application window based it is not always exactly constant time. The interval can be specified using the following property
+
+`rotationWindows`
+
+### `setup(OperatorContext context)`
+
+When an operator is being started there may be stray part files and they need to be cleaned up. One common scenario, when these could be present, is in the case of failure, where a node running the operator failed and a previous instance of the operator was killed. This cleanup and other initial processing for the part files happens in the operator setup. The following diagram describes this process
+
+![Rotation setup](images/fileoutput/FileRotation.png)
+
+
+## Fault-tolerance
+There are two issues that should be addressed in order to make the operator fault-tolerant:
+
+1. The operator flushes data to the filesystem every application window. This implies that after a failure when the operator is re-deployed and tuples of a window are replayed, then duplicate data will be saved to the files. This is handled by recording how much the operator has written to each file every window in a state that is checkpointed and truncating files back to the recovery checkpoint after re-deployment.
+
+2. While writing to HDFS, if the operator gets killed and didn't have the opportunity to close a file, then later when it is redeployed it will attempt to truncate/restore that file. Restoring a file may fail because the lease that the previous process (operator instance before failure) had acquired from namenode to write to a file may still linger and therefore there can be exceptions in acquiring the lease again by the new process (operator instance after failure). This is handled by always writing data to temporary files and renaming these files to actual files when a file is finalized (closed) for writing, that is, we are sure that no more data will be written to it. The relevant configuration item is:  
+  - **alwaysWriteToTmp**: enables/disables writing to a temporary file. *Default*: true.
+
+Most of the complexity in the code comes from making this operator fault-tolerant.
+
+### Checkpointed states needed for fault-tolerance
+
+- `endOffsets`: contains the size of each file as it is being updated by the operator. It helps the operator to restore a file during recovery in operator `setup(...)` and is also used while loading a stream to find out if the operator has seen a file before.
+
+- `fileNameToTmpName`: contains the name of the temporary file per actual file. It is needed because the name of a temporary file is random. They are named based on the timestamp when the stream is created. During recovery the operator needs to know the temp file which it was writing to and if it needs restoration then it creates a new temp file and updates this mapping.
+
+- `finalizedFiles`: contains set of files which were requested to be finalized per window id.
+
+- `finalizedPart`: contains the latest `part` of each file which was requested to be finalized.
+
+The use of `finalizedFiles` and `finalizedPart` are explained in detail under [`requestFinalize(...)`](#requestFinalize) method.
+
+### Recovering files
+When the operator is re-deployed, it checks in its `setup(...)` method if the state of a file which it has seen before the failure is consistent with the file's state on the file system, that is, the size of the file on the file system should match the size in the `endOffsets`. When it doesn't the operator truncates the file.
+
+For example, let's say the operator wrote 100 bytes to test1.txt by the end of window 10. It wrote another 20 bytes by the end of window 12 but failed in window 13. When the operator gets re-deployed it is restored with window 10 (recovery checkpoint) state. In the previous run, by the end of window 10, the size of file on the filesystem was 100 bytes but now it is 120 bytes. Tuples for windows 11 and 12 are going to be replayed. Therefore, in order to avoid writing duplicates to test1.txt, the operator truncates the file to 100 bytes (size at the end of window 10) discarding the last 20 bytes.
+
+### <a name="requestFinalize"></a>`requestFinalize(String fileName)`
+When the operator is always writing to temporary files (in order to avoid HDFS Lease exceptions), then it is necessary to rename the temporary files to the actual files once it has been determined that the files are closed. This is refered to as *finalization* of files and the method allows the user code to specify when a file is ready for finalization.
+
+In this method, the requested file (or in the case of rotation &mdash; all the file parts including the latest open part which have not yet been requested for finalization) are registered for finalization. Registration is basically adding the file names to `finalizedFiles` state and updating `finalizedPart`.
+
+The process of *finalization* of all the files which were requested till the window *w* is deferred till window *w* is committed. This is because until a window is committed it can be replayed after a failure which means that a file can be open for writing even after it was requested for finalization.
+
+When rotation is enabled, part files as and when they get completed are requested for finalization. However, when rotation is not enabled user code needs to invoke this method as the knowledge that when a file is closed is unknown to this abstract operator.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/file_splitter.md
----------------------------------------------------------------------
diff --git a/docs/operators/file_splitter.md b/docs/operators/file_splitter.md
new file mode 100644
index 0000000..777e7b6
--- /dev/null
+++ b/docs/operators/file_splitter.md
@@ -0,0 +1,163 @@
+File Splitter
+===================
+
+This is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits. 
+
+## Why is it needed?
+It is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.
+In these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. 
+The downstream partitions can read/parse the block without the need of interacting with other partitions.
+
+## Class Diagram
+![FileSplitter class dierarchy](images/filesplitter/classdiagram.png)
+
+## AbstractFileSplitter
+The abstract implementation defines the logic of processing `FileInfo`. This comprises the following tasks -  
+
+- building `FileMetadata` per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.
+  
+- creating `BlockMetadataIterator` from `FileMetadata`. The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.
+ 
+- retrieving `BlockMetadata.FileBlockMetadata` from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by `blocksThreshold` setting which by default is 1.  
+
+The main utility method that performs all the above tasks is the [`process()`](#process_method) method. Concrete implementations can invoke this method whenever they have data to process.
+
+### Ports
+Declares only output ports on which file metadata and block metadata are emitted.
+
+- filesMetadataOutput: metadata for each file is emitted on this port. 
+- blocksMetadataOutput: metadata for each block is emitted on this port. 
+
+### <a name="process_method"></a>`process()` method
+When process() is invoked, any pending blocks from the current file are emitted on the 'blocksMetadataOutput' port. If the threshold for blocks per window is still not met then a new input file is processed - corresponding metadata is emitted on 'filesMetadataOutput' and more of its blocks are emitted. This operation is repeated until the `blocksThreshold` is reached or there are no more new files.
+
+```java
+  protected void process()
+  {
+    if (blockMetadataIterator != null && blockCount < blocksThreshold) {
+      emitBlockMetadata();
+    }
+
+    FileInfo fileInfo;
+    while (blockCount < blocksThreshold && (fileInfo = getFileInfo()) != null) {
+      if (!processFileInfo(fileInfo)) {
+        break;
+      }
+    }
+  }
+```
+### Abstract methods
+
+- `FileInfo getFileInfo()`: called from within the `process()` and provides the next file to process.
+
+- `long getDefaultBlockSize()`: provides the block size which is used when user hasn't configured the size.
+
+- `FileStatus getFileStatus(Path path)`: provides the `org.apache.hadoop.fs.FileStatus` instance for a path.   
+
+### Configuration
+1. **blockSize**: size of a block.
+2. **blocksThreshold**<a name="blocksThreshold"></a>: threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.
+
+
+## FileSplitterBase
+Simple operator that receives tuples of type `FileInfo` on its `input` port. `FileInfo` contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.
+### Example application
+This is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.
+![Application with FileSplitterBase](images/filesplitter/baseexample.png)
+
+The upstream operator emits tuples of type `FileInfo` on its output port which is connected to splitter input port. The downstream receives tuples of type `BlockMetadata.FileBlockMetadata` from the splitter's block metadata output port.
+
+```java
+public class ApplicationWithBaseSplitter implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    JMSInput input = dag.addOperator("Input", new JMSInput());
+    FileSplitterBase splitter = dag.addOperator("Splitter", new FileSplitterBase());
+    FSSliceReader blockReader = dag.addOperator("BlockReader", new FSSliceReader());
+    ...
+    dag.addStream("file-info", input.output, splitter.input);
+    dag.addStream("block-metadata", splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
+    ...
+  }
+
+  public static class JMSInput extends AbstractJMSInputOperator<AbstractFileSplitter.FileInfo>
+  {
+
+    public final transient DefaultOutputPort<AbstractFileSplitter.FileInfo> output = new DefaultOutputPort<>();
+
+    @Override
+    protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException
+    {
+      //assuming the message is a text message containing the absolute path of the file.
+      return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());
+    }
+
+    @Override
+    protected void emit(AbstractFileSplitter.FileInfo payload)
+    {
+      output.emit(payload);
+    }
+  }
+}
+```
+
+### Ports
+Declares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.
+ 
+- input: non optional port on which tuples of type `FileInfo` are received.
+
+### Configuration
+1. **file**: path of the file from which the filesystem is inferred. FileSplitter creates an instance of `org.apache.hadoop.fs.FileSystem` which is why this path is needed.  
+```
+FileSystem.newInstance(new Path(file).toUri(), new Configuration());
+```
+The fs instance is then used to fetch the default block size and `org.apache.hadoop.fs.FileStatus` for each file path.
+
+## FileSplitterInput
+This is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by `TimeBasedDirectoryScanner`. The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.
+
+![Interaction between operator and scanner](images/filesplitter/sequence.png)
+
+### Example application
+This is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.
+
+![Application with FileSplitterInput](images/filesplitter/inputexample.png)
+
+Splitter is the input operator here that sends block metadata to the downstream BlockReader.
+
+```java
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput input = dag.addOperator("Input", new FileSplitterInput());
+    FSSliceReader reader = dag.addOperator("Block Reader", new FSSliceReader());
+    ...
+    dag.addStream("block-metadata", input.blocksMetadataOutput, reader.blocksMetadataInput);
+    ...
+  }
+
+```
+### Ports
+Since it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.
+
+### Configuration
+1. **scanner**: the component that scans directories asynchronously. It is of type `com.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner`. The basic implementation of TimeBasedDirectoryScanner can be customized by users.  
+  
+  a. **files**: comma separated list of directories to scan.  
+  
+  b. **recursive**: flag that controls whether the directories should be scanned recursively.  
+ 
+  c. **scanIntervalMillis**: interval specified in milliseconds after which another scan iteration is triggered.  
+  
+  d. **filePatternRegularExp**: regular expression for accepted file names.  
+  
+  e. **trigger**: a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.
+2. **idempotentStorageManager**: by default FileSplitterInput is idempotent. 
+Idempotency ensures that the operator will process the same set of files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one doesn't care about idempotency then they can set this property to be an instance of `com.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager`.
+
+## Handling of split records
+Splitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.
+
+We have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers- `AbstractFSLineReader` and `AbstractFSReadAheadLineReader` can be found here [AbstractFSBlockReader](https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java).

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/blockreader/classdiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/blockreader/classdiagram.png b/docs/operators/images/blockreader/classdiagram.png
new file mode 100644
index 0000000..8fbd6fc
Binary files /dev/null and b/docs/operators/images/blockreader/classdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/blockreader/flowdiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/blockreader/flowdiagram.png b/docs/operators/images/blockreader/flowdiagram.png
new file mode 100644
index 0000000..1b2897d
Binary files /dev/null and b/docs/operators/images/blockreader/flowdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/blockreader/fsreaderexample.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/blockreader/fsreaderexample.png b/docs/operators/images/blockreader/fsreaderexample.png
new file mode 100644
index 0000000..571b60a
Binary files /dev/null and b/docs/operators/images/blockreader/fsreaderexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/blockreader/totalBacklogProcessing.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/blockreader/totalBacklogProcessing.png b/docs/operators/images/blockreader/totalBacklogProcessing.png
new file mode 100644
index 0000000..2ed481f
Binary files /dev/null and b/docs/operators/images/blockreader/totalBacklogProcessing.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/fileoutput/FileRotation.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/fileoutput/FileRotation.png b/docs/operators/images/fileoutput/FileRotation.png
new file mode 100644
index 0000000..624c96e
Binary files /dev/null and b/docs/operators/images/fileoutput/FileRotation.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/fileoutput/diagram1.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/fileoutput/diagram1.png b/docs/operators/images/fileoutput/diagram1.png
new file mode 100644
index 0000000..0a260de
Binary files /dev/null and b/docs/operators/images/fileoutput/diagram1.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/filesplitter/baseexample.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/filesplitter/baseexample.png b/docs/operators/images/filesplitter/baseexample.png
new file mode 100644
index 0000000..6af2b44
Binary files /dev/null and b/docs/operators/images/filesplitter/baseexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/filesplitter/classdiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/filesplitter/classdiagram.png b/docs/operators/images/filesplitter/classdiagram.png
new file mode 100644
index 0000000..6490368
Binary files /dev/null and b/docs/operators/images/filesplitter/classdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/filesplitter/inputexample.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/filesplitter/inputexample.png b/docs/operators/images/filesplitter/inputexample.png
new file mode 100644
index 0000000..65e199f
Binary files /dev/null and b/docs/operators/images/filesplitter/inputexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/afbcfc21/docs/operators/images/filesplitter/sequence.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/filesplitter/sequence.png b/docs/operators/images/filesplitter/sequence.png
new file mode 100644
index 0000000..85cf702
Binary files /dev/null and b/docs/operators/images/filesplitter/sequence.png differ



[14/18] incubator-apex-malhar git commit: APEXCORE-382 #resolve Adding new documentation steps to avoid build artifacts on in site master branch

Posted by th...@apache.org.
APEXCORE-382 #resolve Adding new documentation steps to avoid build artifacts on in site master branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/23a1b100
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/23a1b100
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/23a1b100

Branch: refs/heads/devel-3
Commit: 23a1b100611da9bddce14d6b8ee6aa5c9c5b9102
Parents: 5cecce4
Author: sashadt <sa...@datatorrent.com>
Authored: Mon Mar 14 16:35:23 2016 -0700
Committer: sashadt <sa...@datatorrent.com>
Committed: Tue Mar 15 18:03:43 2016 -0700

----------------------------------------------------------------------
 README.md      |  2 ++
 docs/README.md | 45 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 47 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23a1b100/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index e7e3000..c5b47d0 100644
--- a/README.md
+++ b/README.md
@@ -15,6 +15,8 @@ Documentation
 
 Please visit the [documentation section](http://apex.incubator.apache.org/docs.html).
 
+Documentation build and hosting process is explained in [docs README](docs/README.md).
+
 Discussion group
 --------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23a1b100/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 0000000..3eea1ed
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,45 @@
+# Apex Malhar Documentation
+
+Repository for Malhar docs available on http://apex.incubator.apache.org/docs.html
+
+Documentation is written in [Markdown](https://guides.github.com/features/mastering-markdown/) format and statically generated into HTML using [MkDocs](http://www.mkdocs.org/).  All documentation is located in the [docs](docs) directory, and [mkdocs.yml](mkdocs.yml) file describes the navigation structure of the published documentation.
+
+## Authoring
+
+New pages can be added under [docs](docs) or related sub-category, and a reference to the new page must be added to the [mkdocs.yml](mkdocs.yml) file to make it availabe in the navigation.  Embedded images are typically added to images folder at the same level as the new page.
+
+When creating or editing pages, it can be useful to see the live results, and how the documents will appear when published.  Live preview feature is available by running the following command at the root of the repository:
+
+```bash
+mkdocs serve
+```
+
+For additional details see [writing your docs](http://www.mkdocs.org/user-guide/writing-your-docs/) guide.
+
+## Site Configuration
+
+Guides on applying site-wide [configuration](http://www.mkdocs.org/user-guide/configuration/) and [themeing](http://www.mkdocs.org/user-guide/styling-your-docs/) are available on the MkDocs site.
+
+## Deployment
+
+Deployment is done in two steps.  First all documentation is statically generatd into HTML files and then it is deployed to the apex website.  For more details on how conversion to HTML works see [MkDocs documentation](http://www.mkdocs.org/).
+
+1.  Go to release branch of the repository and execute the following command to build the docs:
+
+```bash
+# set project version
+APEX_MALHAR_VERSION=3.4
+
+# build docs under site foolder
+mkdocs build --clean
+
+# copy docs from site into target folder on apex-site
+cd ../incubator-apex-site
+git checkout asf-site
+cp -r ../incubator-apex-malhar/site docs/malhar-${APEX_MALHAR_VERSION}
+git add -A
+git commit -m "Adding apex-${APEX_MALHAR_VERSION} documentation"
+git push
+```
+
+2.  Go to [apex-site repository](https://github.com/apache/incubator-apex-site#contributing) and add the new link to the [docs.md](https://github.com/apache/incubator-apex-site/blob/master/src/md/docs.md) and follow committer steps to commit and push these changes, and deploy the site.


[05/18] incubator-apex-malhar git commit: Updating .gitignore and pom.xml to support mkdocs and markdown

Posted by th...@apache.org.
Updating .gitignore and pom.xml to support mkdocs and markdown


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7b1a757b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7b1a757b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7b1a757b

Branch: refs/heads/devel-3
Commit: 7b1a757be433465f7b354614581501f590a8a6f6
Parents: d3a7063
Author: sashadt <sa...@datatorrent.com>
Authored: Wed Mar 9 23:53:32 2016 -0800
Committer: sashadt <sa...@datatorrent.com>
Committed: Wed Mar 9 23:53:32 2016 -0800

----------------------------------------------------------------------
 .gitignore | 1 +
 pom.xml    | 3 +++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7b1a757b/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b6ecf7e..541471a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,4 @@ target/
 npm-debug.log
 nb-configuration.xml
 hadoop.log
+site/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7b1a757b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0adeb20..249a8da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,6 +67,8 @@
             <exclude>**/src/main/resources/com/datatorrent/apps/logstream/**</exclude>
             <exclude>src/main/c/zmq_push/Makefile</exclude>
             <exclude>src/test/resources/com/datatorrent/contrib/romesyndication/*.rss</exclude>
+            <exclude>**/*.md</exclude>
+            <exclude>**/*.yml</exclude>
           </excludes>
           <mapping combine.children="append">
             <R>SCRIPT_STYLE</R>
@@ -87,6 +89,7 @@
             <exclude>src/main/resources/**/*.txt</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/*.md</exclude>
+            <exclude>**/*.yml</exclude>
           </excludes>
         </configuration>
       </plugin>


[13/18] incubator-apex-malhar git commit: Merge branch 'bright-APEXMALHAR-1920' into APEXMALHAR-1920

Posted by th...@apache.org.
Merge branch 'bright-APEXMALHAR-1920' into APEXMALHAR-1920


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/da6bf541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/da6bf541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/da6bf541

Branch: refs/heads/devel-3
Commit: da6bf541c493aa8251723a04643a57d24b4ab28a
Parents: 5cecce4 6937c20
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Mar 15 01:59:57 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Mar 15 01:59:57 2016 -0700

----------------------------------------------------------------------
 .../db/jdbc/JDBCDimensionalOutputOperator.java  | 464 +++++++++++++++++++
 1 file changed, 464 insertions(+)
----------------------------------------------------------------------



[09/18] incubator-apex-malhar git commit: Fix error in configuration and add alternative forms

Posted by th...@apache.org.
Fix error in configuration and add alternative forms


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/69bf67bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/69bf67bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/69bf67bb

Branch: refs/heads/devel-3
Commit: 69bf67bbf25f352a4cbe5fab011769afa2654341
Parents: 8c53862
Author: Munagala V. Ramanath <ra...@datatorrent.com>
Authored: Tue Mar 1 16:29:30 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Mar 11 19:22:48 2016 -0800

----------------------------------------------------------------------
 docs/operators/kafkaInputOperator.md | 34 ++++++++++++++++++++++++++++---
 1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/69bf67bb/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
index 1d2258e..793e255 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -266,17 +266,45 @@ public void populateDAG(DAG dag, Configuration entries)
 }
 }
 ```
-Below is the configuration for “test” Kafka topic name and
-“localhost:2181” is the zookeeper forum:
+Below is the configuration for using the earliest offset, “test” as the topic name and
+“localhost:2181” as the zookeeper forum:
 
 ```xml
 <property>
+  <name>dt.operator.MessageReader.prop.initialOffset</name>
+  <value>earliest</value>
+</property>
+
+<property>
 <name>dt.operator.MessageReader.prop.topic</name>
 <value>test</value>
 </property>
 
 <property>
-<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
+<name>dt.operator.MessageReader.prop.zookeeper</nam>
 <value>localhost:2181</value>
 </property>
 ```
+
+Please note that `MessageReader` is the string passed as the first argument to the
+`addOperator()` call. The above stanza sets these parameters for this operator
+regardless of the application it resides in; if you want to set them on a
+per-application basis, you can use this instead (where `KafkaApp` is the name of
+the application):
+
+```xml
+<property>
+  <name>dt.application.KafkaApp.operator.MessageReader.prop.initialOffset</name>
+  <value>earliest</value>
+</property>
+
+<property>
+  <name>dt.application.KafkaApp.operator.MessageReader.prop.topic</name>
+  <value>test-topic</value>
+</property>
+
+<property>
+  <name>dt.application.KafkaApp.operator.MessageReader.prop.zookeeper</name>
+  <value>node21:2181</value>
+</property>
+```


[11/18] incubator-apex-malhar git commit: Splitting out Apex Malhar docs

Posted by th...@apache.org.
Splitting out Apex Malhar docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e89f57e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e89f57e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e89f57e3

Branch: refs/heads/devel-3
Commit: e89f57e3773167d5cbff298e6f4e07ec5c692e9d
Parents: 69bf67b
Author: sashadt <sa...@datatorrent.com>
Authored: Wed Mar 9 20:08:55 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Mar 11 19:22:48 2016 -0800

----------------------------------------------------------------------
 docs/images/malhar-operators.png     | Bin 0 -> 109734 bytes
 docs/operators/kafkaInputOperator.md |  34 +++---------------------------
 2 files changed, 3 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e89f57e3/docs/images/malhar-operators.png
----------------------------------------------------------------------
diff --git a/docs/images/malhar-operators.png b/docs/images/malhar-operators.png
new file mode 100644
index 0000000..ac09622
Binary files /dev/null and b/docs/images/malhar-operators.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e89f57e3/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
index 793e255..1d2258e 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -266,45 +266,17 @@ public void populateDAG(DAG dag, Configuration entries)
 }
 }
 ```
-Below is the configuration for using the earliest offset, “test” as the topic name and
-“localhost:2181” as the zookeeper forum:
+Below is the configuration for “test” Kafka topic name and
+“localhost:2181” is the zookeeper forum:
 
 ```xml
 <property>
-  <name>dt.operator.MessageReader.prop.initialOffset</name>
-  <value>earliest</value>
-</property>
-
-<property>
 <name>dt.operator.MessageReader.prop.topic</name>
 <value>test</value>
 </property>
 
 <property>
-<name>dt.operator.MessageReader.prop.zookeeper</nam>
+<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
 <value>localhost:2181</value>
 </property>
 ```
-
-Please note that `MessageReader` is the string passed as the first argument to the
-`addOperator()` call. The above stanza sets these parameters for this operator
-regardless of the application it resides in; if you want to set them on a
-per-application basis, you can use this instead (where `KafkaApp` is the name of
-the application):
-
-```xml
-<property>
-  <name>dt.application.KafkaApp.operator.MessageReader.prop.initialOffset</name>
-  <value>earliest</value>
-</property>
-
-<property>
-  <name>dt.application.KafkaApp.operator.MessageReader.prop.topic</name>
-  <value>test-topic</value>
-</property>
-
-<property>
-  <name>dt.application.KafkaApp.operator.MessageReader.prop.zookeeper</name>
-  <value>node21:2181</value>
-</property>
-```


[12/18] incubator-apex-malhar git commit: APEXMALHAR-1920 #resolve #comment Add JDBC dimension output operator to Malhar

Posted by th...@apache.org.
APEXMALHAR-1920 #resolve #comment Add JDBC dimension output operator to Malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6937c20b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6937c20b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6937c20b

Branch: refs/heads/devel-3
Commit: 6937c20b100f57583ecc9fbcefc2fc77a1076fae
Parents: 5cecce4
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sun Feb 21 20:10:54 2016 +0530
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Mar 15 01:59:01 2016 -0700

----------------------------------------------------------------------
 .../db/jdbc/JDBCDimensionalOutputOperator.java  | 464 +++++++++++++++++++
 1 file changed, 464 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6937c20b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
new file mode 100644
index 0000000..3021521
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
+import com.datatorrent.lib.dimensions.DimensionsDescriptor;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
+import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+
+/**
+ * This operator writes updates emitted by a {@link DimensionsStoreHDHT}
+ * operator to a Mysql database. Updates are written to the database in the
+ * following fashion: <br/>
+ * <br/>
+ * <ol>
+ * <li>Aggregates are received from an upstream
+ * {@link AbstractDimensionsComputationFlexibleSingleSchema} operator.</li>
+ * <li>Each aggregate is written to a different table based on its dimension
+ * combination, time bucket, and corresponding aggregation</li>
+ * </ol>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JDBCDimensionalOutputOperator
+    extends AbstractPassThruTransactionableStoreOutputOperator<Aggregate, JdbcTransactionalStore>
+{
+  protected static int DEFAULT_BATCH_SIZE = 1000;
+
+  @Min(1)
+  private int batchSize;
+  private final List<Aggregate> tuples;
+
+  private transient int batchStartIdx;
+
+  @NotNull
+  private Map<Integer, Map<String, String>> tableNames;
+  @NotNull
+  private String eventSchema;
+  @NotNull
+  private AggregatorRegistry aggregatorRegistry = AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY;
+  private DimensionalConfigurationSchema schema;
+
+  private transient Map<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatement = Maps.newHashMap();
+
+  public JDBCDimensionalOutputOperator()
+  {
+    tuples = Lists.newArrayList();
+    batchSize = DEFAULT_BATCH_SIZE;
+    batchStartIdx = 0;
+    store = new JdbcTransactionalStore();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    LOG.info("Done setting up super");
+    aggregatorRegistry.setup();
+
+    //Create prepared statements
+    schema = new DimensionalConfigurationSchema(eventSchema, aggregatorRegistry);
+
+    List<FieldsDescriptor> keyFDs = schema.getDimensionsDescriptorIDToKeyDescriptor();
+
+    for (int ddID = 0; ddID < keyFDs.size(); ddID++) {
+
+      LOG.info("ddID {}", ddID);
+      FieldsDescriptor keyFD = keyFDs.get(ddID);
+      Int2ObjectMap<FieldsDescriptor> aggIDToAggFD = schema
+          .getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID);
+
+      Map<Integer, PreparedStatement> aggIDToStatement = ddIDToAggIDToStatement.get(ddID);
+
+      if (aggIDToStatement == null) {
+        aggIDToStatement = Maps.newHashMap();
+        ddIDToAggIDToStatement.put(ddID, aggIDToStatement);
+      }
+
+      for (Map.Entry<String, String> aggTable : tableNames.get(ddID).entrySet()) {
+        int aggID = aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggTable.getKey());
+
+        LOG.info("aggID {}", aggID);
+        FieldsDescriptor aggFD = aggIDToAggFD.get(aggID);
+
+        List<String> keyNames = keyFD.getFieldList();
+        keyNames.remove(DimensionsDescriptor.DIMENSION_TIME_BUCKET);
+
+        LOG.info("List fields {}", keyNames);
+        List<String> aggregateNames = aggFD.getFieldList();
+        LOG.info("List fields {}", aggregateNames);
+        String tableName = aggTable.getValue();
+
+        String statementString = buildStatement(tableName, keyNames, aggregateNames);
+
+        try {
+          aggIDToStatement.put(aggID, store.getConnection().prepareStatement(statementString));
+        } catch (SQLException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  private String buildStatement(String tableName, List<String> keyNames, List<String> aggregateNames)
+  {
+    LOG.info("building statement");
+    StringBuilder sb = new StringBuilder();
+    sb.append("INSERT INTO ");
+    sb.append(tableName);
+    sb.append(" (");
+
+    addList(sb, keyNames);
+    sb.append(",");
+    addList(sb, aggregateNames);
+
+    sb.append(") VALUES (");
+
+    for (int qCounter = 0;; qCounter++) {
+      sb.append("?");
+
+      if (qCounter == keyNames.size() + aggregateNames.size() - 1) {
+        break;
+      }
+
+      sb.append(",");
+    }
+
+    sb.append(") ON DUPLICATE KEY UPDATE ");
+
+    addOnDuplicate(sb, aggregateNames);
+
+    return sb.toString();
+  }
+
+  private void addOnDuplicate(StringBuilder sb, List<String> names)
+  {
+    LOG.info("add Duplicate");
+    for (int index = 0;; index++) {
+
+      String name = names.get(index);
+      sb.append(name);
+      sb.append("=");
+      sb.append("VALUES(");
+      sb.append(name);
+      sb.append(")");
+
+      if (index == names.size() - 1) {
+        break;
+      }
+
+      sb.append(",");
+    }
+  }
+
+  private void addList(StringBuilder sb, List<String> names)
+  {
+    for (int index = 0;; index++) {
+      sb.append(names.get(index));
+
+      if (index == names.size() - 1) {
+        break;
+      }
+
+      sb.append(",");
+    }
+  }
+
+  /**
+   * This sets the table names that corresponds to the dimensions combinations
+   * specified in your {@link DimensionalConfigurationSchema}. The structure of
+   * this property is as follows: <br/>
+   * <br/>
+   * <ol>
+   * <li>The first key is the dimension combination id assigned to a dimension
+   * combination in your {@link DimensionalConfigurationSchema}. <br/>
+   * <br/>
+   * The dimensions descriptor id is determined by the following factors:
+   * <ul>
+   * <li>The dimensions combination specified in the
+   * {@link DimensionalConfigurationSchema}.</li>
+   * <li>The the Time Buckets defined in your
+   * {@link DimensionalConfigurationSchema}.</li>
+   * </ul>
+   * The dimensions descriptor id is computed in the following way:
+   * <ol>
+   * <li>The first dimensions descriptor id is 0</li>
+   * <li>A dimension combination is selected</li>
+   * <li>A time bucket is selected</li>
+   * <li>The current dimension combination and time bucket pair is assigned a
+   * dimensions descriptor id</li>
+   * <li>The current dimensions descriptor id is incremented</li>
+   * <li>Steps 3 - 5 are repeated until all the time buckets are done</li>
+   * <li>Steps 2 - 6 are repeated until all the dimension combinations are done.
+   * </li>
+   * </ol>
+   * <br/>
+   * <</li>
+   * <li>The second key is the name of an aggregation being performed for that
+   * dimensions combination.</li>
+   * <li>The value is the name of the output Mysql table</li>
+   * </ol>
+   *
+   * @param tableNames
+   *          The table names that corresponds to the dimensions combinations
+   *          specified in your {@link DimensionalConfigurationSchema}.
+   */
+  public void setTableNames(Map<Integer, Map<String, String>> tableNames)
+  {
+    this.tableNames = Preconditions.checkNotNull(tableNames);
+  }
+
+  /**
+   * Sets the JSON corresponding to the {@link DimensionalConfigurationSchema}
+   * which was set on the upstream {@link AppDataSingleSchemaDimensionStoreHDHT}
+   * and {@link AbstractDimensionsComputationFlexibleSingleSchema} operators.
+   *
+   * @param eventSchema
+   *          The JSON corresponding to the
+   *          {@link DimensionalConfigurationSchema} which was set on the
+   *          upstream {@link AppDataSingleSchemaDimensionStoreHDHT} and
+   *          {@link AbstractDimensionsComputationFlexibleSingleSchema}
+   *          operators.
+   */
+  public void setEventSchema(String eventSchema)
+  {
+    this.eventSchema = eventSchema;
+  }
+
+  /**
+   * Sets the {@link AggregatorRegistry} that is used to determine what
+   * aggregators correspond to what ids.
+   *
+   * @param aggregatorRegistry
+   *          The {@link AggregatorRegistry} that is used to determine what
+   *          aggregators correspond to what ids.
+   */
+  public void setAggregatorRegistry(AggregatorRegistry aggregatorRegistry)
+  {
+    this.aggregatorRegistry = aggregatorRegistry;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    //Process any remaining tuples.
+    if (tuples.size() - batchStartIdx > 0) {
+      processBatch();
+    }
+    super.endWindow();
+    tuples.clear();
+    batchStartIdx = 0;
+  }
+
+  @Override
+  public void processTuple(Aggregate tuple)
+  {
+    tuples.add(tuple);
+    if ((tuples.size() - batchStartIdx) >= batchSize) {
+      processBatch();
+    }
+  }
+
+  /**
+   * Processes all the tuples in the batch once the batch size for the operator
+   * is reached.
+   */
+  private void processBatch()
+  {
+    LOG.info("start {} end {}", batchStartIdx, tuples.size());
+    try {
+      for (int i = batchStartIdx; i < tuples.size(); i++) {
+        setStatementParameters(tuples.get(i));
+      }
+
+      for (Map.Entry<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatementEntry : ddIDToAggIDToStatement
+          .entrySet()) {
+        for (Map.Entry<Integer, PreparedStatement> entry : ddIDToAggIDToStatementEntry.getValue().entrySet()) {
+          entry.getValue().executeBatch();
+          entry.getValue().clearBatch();
+        }
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("processing batch", e);
+    } finally {
+      batchStartIdx += tuples.size() - batchStartIdx;
+    }
+  }
+
+  /**
+   * Sets the parameters on the {@link java.sql.PreparedStatement} based on the
+   * values in the given {@link Aggregate}.
+   *
+   * @param aggregate
+   *          The {@link Aggregate} whose values will be set on the
+   *          corresponding {@link java.sql.PreparedStatement}.
+   */
+  private void setStatementParameters(Aggregate aggregate)
+  {
+    EventKey eventKey = aggregate.getEventKey();
+
+    int ddID = eventKey.getDimensionDescriptorID();
+    int aggID = eventKey.getAggregatorID();
+
+    LOG.info("Setting statement params {} {}", ddID, aggID);
+
+    FieldsDescriptor keyFD = schema.getDimensionsDescriptorIDToKeyDescriptor().get(ddID);
+    FieldsDescriptor aggFD = schema.getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID)
+        .get(aggID);
+
+    GPOMutable key = eventKey.getKey();
+    key.setFieldDescriptor(keyFD);
+
+    GPOMutable value = aggregate.getAggregates();
+    value.setFieldDescriptor(aggFD);
+
+    int qCounter = 1;
+
+    PreparedStatement ps = ddIDToAggIDToStatement.get(ddID).get(aggID);
+
+    try {
+      qCounter = setParams(ps, key, qCounter, true);
+      setParams(ps, value, qCounter, false);
+      ps.addBatch();
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * @param ps
+   *          The {@link java.sql.PreparedStatement} which will do an insert
+   *          into the Mysql database.
+   * @param gpo
+   *          The {@link GPOMutable} object whose values need to be set in the
+   *          preparted statement.
+   * @param qCounter
+   *          The current index in the prepared statement
+   * @param isKey
+   *          TODO use this
+   * @return The current index in the prepared statement.
+   * @throws SQLException
+   */
+  private int setParams(PreparedStatement ps, GPOMutable gpo, int qCounter, boolean isKey) throws SQLException
+  {
+    FieldsDescriptor fd = gpo.getFieldDescriptor();
+
+    Map<String, Type> fieldToType = fd.getFieldToType();
+    List<String> fields = fd.getFieldList();
+
+    for (int fieldCounter = 0; fieldCounter < fields.size(); fieldCounter++, qCounter++) {
+      String fieldName = fields.get(fieldCounter);
+
+      if (fieldName.equals(DimensionsDescriptor.DIMENSION_TIME_BUCKET)) {
+        qCounter--;
+        continue;
+      }
+
+      Type type = fieldToType.get(fieldName);
+
+      LOG.info("Field Name {} {}", fieldName, qCounter);
+
+      switch (type) {
+        case BOOLEAN: {
+          ps.setByte(qCounter, (byte)(gpo.getFieldBool(fieldName) ? 1 : 0));
+          break;
+        }
+        case BYTE: {
+          ps.setByte(qCounter, gpo.getFieldByte(fieldName));
+          break;
+        }
+        case CHAR: {
+          ps.setString(qCounter, Character.toString(gpo.getFieldChar(fieldName)));
+          break;
+        }
+        case STRING: {
+          ps.setString(qCounter, gpo.getFieldString(fieldName));
+          break;
+        }
+        case SHORT: {
+          ps.setInt(qCounter, gpo.getFieldShort(fieldName));
+          break;
+        }
+        case INTEGER: {
+          ps.setInt(qCounter, gpo.getFieldInt(fieldName));
+          break;
+        }
+        case LONG: {
+          ps.setLong(qCounter, gpo.getFieldLong(fieldName));
+          break;
+        }
+        case FLOAT: {
+          ps.setFloat(qCounter, gpo.getFieldFloat(fieldName));
+          break;
+        }
+        case DOUBLE: {
+          ps.setDouble(qCounter, gpo.getFieldDouble(fieldName));
+          break;
+        }
+        default: {
+          throw new UnsupportedOperationException("The type: " + type + " is not supported.");
+        }
+      }
+    }
+
+    return qCounter;
+  }
+
+  /**
+   * Sets the size of a batch operation.<br/>
+   * <b>Default:</b> {@value #DEFAULT_BATCH_SIZE}
+   *
+   * @param batchSize
+   *          size of a batch
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
+}
+


[04/18] incubator-apex-malhar git commit: Add @since tags and update change log for release 3.3.1

Posted by th...@apache.org.
Add @since tags and update change log for release 3.3.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d3a7063d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d3a7063d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d3a7063d

Branch: refs/heads/devel-3
Commit: d3a7063d690e63984c5c93d977dbc0a16293e0c5
Parents: 4d0f8fc
Author: bhupesh <bh...@gmail.com>
Authored: Wed Feb 24 20:36:52 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Thu Mar 3 21:40:42 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d3a7063d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d4698c0..00ea297 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,26 @@
 Apex Malhar Changelog
 ========================================================================================================================
 
+Version 3.3.1-incubating - 2016-02-27
+------------------------------------------------------------------------------------------------------------------------
+
+### Bug
+* [APEXMALHAR-1970] - ArrayOutOfBoundary error in One_To_Many Partitioner for 0.9 kafka input operator
+* [APEXMALHAR-1973] - InitialOffset bug and duplication caused by offset checkpoint
+* [APEXMALHAR-1984] - Operators that use Kryo directly would throw exception in local mode
+* [APEXMALHAR-1990] - Occasional concurrent modification exceptions from IdempotentStorageManager
+* [APEXMALHAR-1993] - Committed offsets are not present in offset manager storage for kafka input operator
+* [APEXMALHAR-1994] - Operator partitions are reporting offsets for kafka partitions they don't subscribe to
+* [APEXMALHAR-1998] - Kafka unit test memory requirement breaks Travis CI build
+* [APEXMALHAR-2003] - NPE in FileSplitterInput
+
+### Improvement
+* [APEXMALHAR-1983] - Support special chars in topics setting for new Kafka Input Operator
+
+### Task
+* [APEXMALHAR-1968] - Update NOTICE copyright year
+* [APEXMALHAR-1986] - Change semantic version check to use 3.3 release
+
 Version 3.3.0-incubating - 2016-01-10
 ------------------------------------------------------------------------------------------------------------------------
 


[02/18] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2003' of https://github.com/chandnisingh/incubator-apex-malhar

Posted by th...@apache.org.
Merge branch 'APEXMALHAR-2003' of https://github.com/chandnisingh/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/de8eabe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/de8eabe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/de8eabe7

Branch: refs/heads/devel-3
Commit: de8eabe730c11ed3ae8208dab7da9d3c614ddeaf
Parents: 8331f56 289dad7
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Feb 25 13:46:51 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Thu Feb 25 13:46:51 2016 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         |  6 +--
 .../lib/io/fs/FileSplitterInputTest.java        | 44 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[03/18] incubator-apex-malhar git commit: Merge branch 'devel-3'

Posted by th...@apache.org.
Merge branch 'devel-3'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4d0f8fc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4d0f8fc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4d0f8fc0

Branch: refs/heads/devel-3
Commit: 4d0f8fc0119309fbfc43b9f2989e4b32f9c0908d
Parents: de8eabe fd2f42b
Author: Thomas Weise <th...@datatorrent.com>
Authored: Fri Feb 26 15:43:33 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Feb 26 15:43:33 2016 -0800

----------------------------------------------------------------------
 .../lib/join/AbstractJoinOperator.java          | 435 +++++++++++++++++++
 .../java/com/datatorrent/lib/join/Bucket.java   |  91 ++++
 .../com/datatorrent/lib/join/InMemoryStore.java | 107 +++++
 .../com/datatorrent/lib/join/JoinStore.java     |  91 ++++
 .../datatorrent/lib/join/MapJoinOperator.java   | 100 +++++
 .../datatorrent/lib/join/POJOJoinOperator.java  | 266 ++++++++++++
 .../datatorrent/lib/join/TimeBasedStore.java    | 333 ++++++++++++++
 .../com/datatorrent/lib/join/TimeEvent.java     |  50 +++
 .../com/datatorrent/lib/join/TimeEventImpl.java | 121 ++++++
 .../lib/join/MapTimeBasedJoinOperator.java      | 118 +++++
 .../lib/join/POJOTimeBasedJoinOperatorTest.java | 385 ++++++++++++++++
 11 files changed, 2097 insertions(+)
----------------------------------------------------------------------



[16/18] incubator-apex-malhar git commit: APEXMALHAR-2008: HDFS File Input module

Posted by th...@apache.org.
APEXMALHAR-2008: HDFS File Input module


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f9fe3d5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f9fe3d5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f9fe3d5e

Branch: refs/heads/devel-3
Commit: f9fe3d5e9e5ef06ebba313a9a09fa268ec9ead7c
Parents: d3a7063
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Fri Mar 11 13:22:10 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Wed Mar 16 15:07:08 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  31 ++-
 .../datatorrent/lib/io/block/BlockReader.java   |  66 +++++
 .../lib/io/fs/AbstractFileSplitter.java         |  45 +++-
 .../lib/io/fs/FileSplitterInput.java            |  81 +++++--
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
 .../datatorrent/lib/io/fs/HDFSInputModule.java  | 243 +++++++++++++++++++
 .../lib/io/fs/FileSplitterInputTest.java        |   2 +-
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 +++++++++++++++++
 8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 534024d..6e38e45 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -196,11 +196,13 @@ public interface BlockMetadata
   }
 
   /**
-   * A block of file which contains file path adn other block properties.
+   * A block of file which contains file path and other block properties.
+   * It also controls if blocks should be read in sequence
    */
   class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
+    private boolean readBlockInSequence = false;
 
     protected FileBlockMetadata()
     {
@@ -225,10 +227,37 @@ public interface BlockMetadata
       return filePath;
     }
 
+    /**
+     * Get if blocks should be read in sequence
+     * @return readBlockInSequence
+     */
+    public boolean isReadBlockInSequence()
+    {
+      return readBlockInSequence;
+    }
+
+    /**
+     * Set if blokcs should be read in sequence
+     * @param readBlockInSequence
+     */
+    public void setReadBlockInSequence(boolean readBlockInSequence)
+    {
+      this.readBlockInSequence = readBlockInSequence;
+    }
+
     public FileBlockMetadata newInstance(@NotNull String filePath)
     {
       Preconditions.checkNotNull(filePath);
       return new FileBlockMetadata(filePath);
     }
+
+    @Override
+    public int hashCode()
+    {
+      if (isReadBlockInSequence()) {
+        return getFilePath().hashCode();
+      }
+      return super.hashCode();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
new file mode 100644
index 0000000..f4f7d76
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.block;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
+ */
+public class BlockReader extends FSSliceReader
+{
+  @AutoMetric
+  private long bytesRead;
+
+  protected String uri;
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    bytesRead = 0;
+  }
+
+  @Override
+  protected FileSystem getFSInstance() throws IOException
+  {
+    return FileSystem.newInstance(URI.create(uri), configuration);
+  }
+
+  /**
+   * Sets the uri
+   *
+   * @param uri of form hdfs://hostname:port/path/to/input
+   */
+  public void setUri(String uri)
+  {
+    this.uri = uri;
+  }
+
+  public String getUri()
+  {
+    return uri;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index cd47d48..b39168c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.lib.io.fs;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -27,12 +28,10 @@ import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
-
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
@@ -206,6 +205,13 @@ public abstract class AbstractFileSplitter extends BaseOperator
     fileMetadata.setDirectory(status.isDirectory());
     fileMetadata.setFileLength(status.getLen());
 
+    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
+      fileMetadata.setRelativePath(status.getPath().getName());
+    } else {
+      String relativePath = getRelativePathWithFolderName(fileInfo);
+      fileMetadata.setRelativePath(relativePath);
+    }
+
     if (!status.isDirectory()) {
       int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
       if (fileMetadata.getDataOffset() >= status.getLen()) {
@@ -217,6 +223,15 @@ public abstract class AbstractFileSplitter extends BaseOperator
     return fileMetadata;
   }
 
+  /*
+   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
+   */
+  private String getRelativePathWithFolderName(FileInfo fileInfo)
+  {
+    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
+    return parentDir + File.separator + fileInfo.getRelativeFilePath();
+  }
+
   /**
    * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata}
    *
@@ -346,6 +361,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
     private long discoverTime;
     private long[] blockIds;
     private boolean isDirectory;
+    private String relativePath;
 
     @SuppressWarnings("unused")
     protected FileMetadata()
@@ -493,6 +509,31 @@ public abstract class AbstractFileSplitter extends BaseOperator
     {
       return isDirectory;
     }
+
+    /**
+     * Sets relative file path
+     * @return relativePath
+     */
+    public String getRelativePath()
+    {
+      return relativePath;
+    }
+
+    /**
+     * Gets relative file path
+     * @param relativePath
+     */
+    public void setRelativePath(String relativePath)
+    {
+      this.relativePath = relativePath;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "FileMetadata [fileName=" + fileName + ", numberOfBlocks=" + numberOfBlocks + ", isDirectory=" + isDirectory + ", relativePath=" + relativePath + "]";
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..1d8248f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -292,6 +292,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     private transient ScannedFileInfo lastScannedInfo;
     private transient int numDiscoveredPerIteration;
 
+    @NotNull
+    protected final Map<String, Map<String, Long>> inputDirTolastModifiedTimes;
+
     public TimeBasedDirectoryScanner()
     {
       recursive = true;
@@ -301,6 +304,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       discoveredFiles = new LinkedBlockingDeque<>();
       atomicThrowable = new AtomicReference<>();
       ignoredFiles = Sets.newHashSet();
+      inputDirTolastModifiedTimes = Maps.newHashMap();
     }
 
     @Override
@@ -360,7 +364,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
             lastScannedInfo = null;
             numDiscoveredPerIteration = 0;
             for (String afile : files) {
-              scan(new Path(afile), null);
+              Map<String, Long> lastModifiedTimesForInputDir;
+              lastModifiedTimesForInputDir = getLastModifiedTimeMap(afile);
+              scan(new Path(afile), null, lastModifiedTimesForInputDir);
             }
             scanIterationComplete();
           } else {
@@ -375,6 +381,15 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    private Map<String, Long> getLastModifiedTimeMap(String key)
+    {
+      if (inputDirTolastModifiedTimes.get(key) == null) {
+        Map<String, Long> modifiedTimeMap = Maps.newHashMap();
+        inputDirTolastModifiedTimes.put(key, modifiedTimeMap);
+      }
+      return inputDirTolastModifiedTimes.get(key);
+    }
+
     /**
      * Operations that need to be done once a scan is complete.
      */
@@ -386,6 +401,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
 
     protected void scan(@NotNull Path filePath, Path rootPath)
     {
+      Map<String, Long> lastModifiedTimesForInputDir;
+      lastModifiedTimesForInputDir = getLastModifiedTimeMap(filePath.toUri().getPath());
+      scan(filePath, rootPath, lastModifiedTimesForInputDir);
+    }
+
+    private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir)
+    {
       try {
         FileStatus parentStatus = fs.getFileStatus(filePath);
         String parentPathStr = filePath.toUri().getPath();
@@ -393,27 +415,22 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
         LOG.debug("scan {}", parentPathStr);
 
         FileStatus[] childStatuses = fs.listStatus(filePath);
-        for (FileStatus status : childStatuses) {
-          Path childPath = status.getPath();
-          ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath);
-
-          if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) {
-            continue;
-          }
 
-          if (status.isDirectory()) {
-            if (recursive) {
-              scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
-            }
-          }
+        if (childStatuses.length == 0 && rootPath == null && lastModifiedTimesForInputDir.get(parentPathStr) == null) { // empty input directory copy as is
+          ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime());
+          processDiscoveredFile(info);
+          lastModifiedTimesForInputDir.put(parentPathStr, parentStatus.getModificationTime());
+        }
 
+        for (FileStatus childStatus : childStatuses) {
+          Path childPath = childStatus.getPath();
           String childPathStr = childPath.toUri().getPath();
-          if (ignoredFiles.contains(childPathStr)) {
-            continue;
-          }
-          if (acceptFile(childPathStr)) {
-            LOG.debug("found {}", childPathStr);
-            processDiscoveredFile(info);
+
+          if (childStatus.isDirectory() && isRecursive()) {
+            addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
+            scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir);
+          } else if (acceptFile(childPathStr)) {
+            addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
           } else {
             // don't look at it again
             ignoredFiles.add(childPathStr);
@@ -426,6 +443,31 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    private void addToDiscoveredFiles(Path rootPath, FileStatus parentStatus, FileStatus childStatus,
+        Map<String, Long> lastModifiedTimesForInputDir) throws IOException
+    {
+      Path childPath = childStatus.getPath();
+      String childPathStr = childPath.toUri().getPath();
+      // Directory by now is scanned forcibly. Now check for whether file/directory needs to be added to discoveredFiles.
+      Long oldModificationTime = lastModifiedTimesForInputDir.get(childPathStr);
+      lastModifiedTimesForInputDir.put(childPathStr, childStatus.getModificationTime());
+
+      if (skipFile(childPath, childStatus.getModificationTime(), oldModificationTime) || // Skip dir or file if no timestamp modification
+          (childStatus.isDirectory() && (oldModificationTime != null))) { // If timestamp modified but if its a directory and already present in map, then skip.
+        return;
+      }
+
+      if (ignoredFiles.contains(childPathStr)) {
+        return;
+      }
+
+      ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus,
+          rootPath);
+
+      LOG.debug("Processing file: " + info.getFilePath());
+      processDiscoveredFile(info);
+    }
+
     protected void processDiscoveredFile(ScannedFileInfo info)
     {
       numDiscoveredPerIteration++;
@@ -619,4 +661,5 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
new file mode 100644
index 0000000..24466d5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+
+/**
+ * HDFSFileSplitter extends {@link FileSplitterInput} to,
+ * 1. Add relative path to file metadata.
+ * 2. Ignore HDFS temp files (files with extensions _COPYING_).
+ * 3. Set sequencial read option on readers.
+ */
+public class HDFSFileSplitter extends FileSplitterInput
+{
+  private boolean sequencialFileRead;
+
+  public HDFSFileSplitter()
+  {
+    super();
+    super.setScanner(new HDFSScanner());
+  }
+
+
+  @Override
+  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
+  {
+    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
+    blockMetadta.setReadBlockInSequence(sequencialFileRead);
+    return blockMetadta;
+  }
+
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+
+  /**
+   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
+   * and files containing unsupported characters. 
+   */
+  public static class HDFSScanner extends TimeBasedDirectoryScanner
+  {
+    protected static final String UNSUPPORTED_CHARACTOR = ":";
+    private String ignoreFilePatternRegularExp = ".*._COPYING_";
+    private transient Pattern ignoreRegex;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      super.setup(context);
+      ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
+    }
+
+    @Override
+    protected boolean acceptFile(String filePathStr)
+    {
+      boolean accepted = super.acceptFile(filePathStr);
+      if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) {
+        return false;
+      }
+      return accepted;
+    }
+
+    private boolean isIgnoredFile(String filePathStr)
+    {
+      String fileName = new Path(filePathStr).getName();
+      if (ignoreRegex != null) {
+        Matcher matcher = ignoreRegex.matcher(fileName);
+        if (matcher.matches()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private boolean containsUnsupportedCharacters(String filePathStr)
+    {
+      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
+    }
+
+    public String getIgnoreFilePatternRegularExp()
+    {
+      return ignoreFilePatternRegularExp;
+    }
+
+    public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp)
+    {
+      this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
+      this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
new file mode 100644
index 0000000..2b914f1
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
+ * Module emits, <br/>
+ * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
+ * The module reads data in parallel, following parameters can be configured<br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ */
+public class HDFSInputModule implements Module
+{
+
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private long blockSize;
+  private boolean sequencialFileRead = false;
+  private int readersCount;
+
+  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort<>();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
+    BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader());
+
+    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
+
+    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
+    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
+    messages.set(blockReader.messages);
+
+    fileSplitter.setSequencialFileRead(sequencialFileRead);
+    if (blockSize != 0) {
+      fileSplitter.setBlockSize(blockSize);
+    }
+
+    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
+    fileScanner.setFiles(files);
+    if (scanIntervalMillis != 0) {
+      fileScanner.setScanIntervalMillis(scanIntervalMillis);
+    }
+    fileScanner.setRecursive(recursive);
+    if (filePatternRegularExp != null) {
+      fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+    }
+
+    blockReader.setUri(files);
+    if (readersCount != 0) {
+      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
+    }
+  }
+
+  /**
+   * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used.
+   * A fully qualified path can be provided to scan directories in other filesystems.
+   *
+   * @param files
+   *          files
+   */
+  public void setFiles(String files)
+  {
+    this.files = files;
+  }
+
+  /**
+   * Gets the files to be scanned.
+   *
+   * @return files to be scanned.
+   */
+  public String getFiles()
+  {
+    return files;
+  }
+
+  /**
+   * Gets the regular expression for file names to split
+   *
+   * @return regular expression
+   */
+  public String getFilePatternRegularExp()
+  {
+    return filePatternRegularExp;
+  }
+
+  /**
+   * Only files with names matching the given java regular expression are split
+   *
+   * @param filePatternRegexp
+   *          regular expression
+   */
+  public void setFilePatternRegularExp(String filePatternRegexp)
+  {
+    this.filePatternRegularExp = filePatternRegexp;
+  }
+
+  /**
+   * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory
+   *
+   * @return scanInterval milliseconds
+   */
+  public long getScanIntervalMillis()
+  {
+    return scanIntervalMillis;
+  }
+
+  /**
+   * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory
+   *
+   * @param scanIntervalMillis
+   */
+  public void setScanIntervalMillis(long scanIntervalMillis)
+  {
+    this.scanIntervalMillis = scanIntervalMillis;
+  }
+
+  /**
+   * Get is scan recursive
+   *
+   * @return isRecursive
+   */
+  public boolean isRecursive()
+  {
+    return recursive;
+  }
+
+  /**
+   * set is scan recursive
+   *
+   * @param recursive
+   */
+  public void setRecursive(boolean recursive)
+  {
+    this.recursive = recursive;
+  }
+
+  /**
+   * Get block size used to read input blocks of file
+   *
+   * @return blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * Sets block size used to read input blocks of file
+   *
+   * @param blockSize
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Gets readers count
+   * @return readersCount
+   */
+  public int getReadersCount()
+  {
+    return readersCount;
+  }
+
+  /**
+   * Static count of readers to read input file
+   * @param readersCount
+   */
+  public void setReadersCount(int readersCount)
+  {
+    this.readersCount = readersCount;
+  }
+
+  /**
+   * Gets is sequencial file read
+   * 
+   * @return sequencialFileRead
+   */
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  /**
+   * Sets is sequencial file read
+   *
+   * @param sequencialFileRead
+   */
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..1d6cf03 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -71,7 +71,7 @@ public class FileSplitterInputTest
       }
       allLines.addAll(lines);
       File created = new File(dataDirectory, "file" + file + ".txt");
-      filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString());
+      filePaths.add(created.getAbsolutePath());
       FileUtils.write(created, StringUtils.join(lines, '\n'));
     }
     return filePaths;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
new file mode 100644
index 0000000..8bb1e26
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.stream.DevNull;
+import com.datatorrent.netlet.util.Slice;
+
+public class HDFSInputModuleAppTest
+{
+  private String inputDir;
+  static String outputDir;
+  private StreamingApplication app;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "File one data";
+  private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
+  static final String OUT_DATA_FILE = "fileData.txt";
+  static final String OUT_METADATA_FILE = "fileMetaData.txt";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+    outputDir = testMeta.baseDirectory + File.separator + "output";
+
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+    FileUtils.forceMkdir(new File(inputDir + File.separator + "dir"));
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(inputDir));
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    app = new Application();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
+    conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+    conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+    FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+    while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
+      Thread.sleep(500);
+      LOG.debug("Waiting for {}", outDir);
+    }
+
+    Thread.sleep(10000);
+    lc.shutdown();
+
+    Assert.assertTrue("output dir does not exist", fs.exists(outDir));
+
+    File dir = new File(outputDir);
+    FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
+
+    fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
+    verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
+  }
+
+  private void verifyFileContents(File[] files, String expectedData) throws IOException
+  {
+    StringBuilder filesData = new StringBuilder();
+    for (File file : files) {
+      filesData.append(FileUtils.readFileToString(file));
+    }
+    Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1);
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(HDFSInputModuleAppTest.class);
+
+  private static class Application implements StreamingApplication
+  {
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      HDFSInputModule module = dag.addModule("hdfsInputModule", HDFSInputModule.class);
+
+      AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(HDFSInputModuleAppTest.OUT_METADATA_FILE);
+      metadataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+      dag.addOperator("FileMetadataWriter", metadataWriter);
+
+      AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(HDFSInputModuleAppTest.OUT_DATA_FILE);
+      dataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+      dag.addOperator("FileDataWriter", dataWriter);
+
+      DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
+
+      dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
+      dag.addStream("data", module.messages, dataWriter.input);
+      dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
+    }
+  }
+
+  private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private MetadataWriter()
+    {
+
+    }
+
+    public MetadataWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(FileMetadata tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(FileMetadata tuple)
+    {
+      return (tuple).toString().getBytes();
+    }
+  }
+
+  private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private HDFSFileWriter()
+    {
+    }
+
+    public HDFSFileWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(ReaderRecord<Slice> tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple)
+    {
+      return tuple.getRecord().buffer;
+    }
+  }
+
+}


[10/18] incubator-apex-malhar git commit: Tutorial for Kafka Input Operator

Posted by th...@apache.org.
Tutorial for Kafka Input Operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8c538621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8c538621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8c538621

Branch: refs/heads/devel-3
Commit: 8c538621f02dca0e148e2dcb5c08793ec0f8bfda
Parents: afbcfc2
Author: Chaitanya <ch...@datatorrent.com>
Authored: Tue Dec 8 11:34:36 2015 +0530
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Mar 11 19:22:48 2016 -0800

----------------------------------------------------------------------
 docs/operators/images/kafkainput/image00.png | Bin 0 -> 36143 bytes
 docs/operators/kafkaInputOperator.md         | 282 ++++++++++++++++++++++
 2 files changed, 282 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8c538621/docs/operators/images/kafkainput/image00.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/kafkainput/image00.png b/docs/operators/images/kafkainput/image00.png
new file mode 100644
index 0000000..0fa00e8
Binary files /dev/null and b/docs/operators/images/kafkainput/image00.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8c538621/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
new file mode 100644
index 0000000..1d2258e
--- /dev/null
+++ b/docs/operators/kafkaInputOperator.md
@@ -0,0 +1,282 @@
+KAFKA INPUT OPERATOR
+=====================
+
+### Introduction: About Kafka Input Operator
+
+This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.
+
+### Why is it needed ?
+
+Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across
+nodes. Kafka input operator is needed when you want to read data from multiple
+partitions of a Kafka topic in parallel in an Apex application.
+
+### AbstractKafkaInputOperator
+
+This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn’t have any ports.
+
+![AbstractKafkaInput.png](images/kafkainput/image00.png)
+
+#### Configuration Parameters
+<table>
+<col width="25%" />
+<col width="75%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>maxTuplesPerWindow</p></td>
+<td align="left"><p>Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default value = MAX_VALUE </p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>idempotentStorageManager</p></td>
+<td align="left"><p>This is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager.<br>NoopIdempotentStorageManager</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>strategy</p></td>
+<td align="left"><p>Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.</p>
+<p>ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.</p>
+<p>ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+Default Value = ONE_TO_ONE</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>msgRateUpperBound</p></td>
+<td align="left"><p>Maximum messages upper bound. Operator repartitions when the *msgProcessedPS* exceeds this bound. *msgProcessedPS* is the average number of messages processed per second by this operator.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>byteRateUpperBound</p></td>
+<td align="left"><p>Maximum bytes upper bound. Operator repartitions when the *bytesPS* exceeds this bound. *bytesPS* is the average number of bytes processed per second by this operator.</p>
+<p></p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>offsetManager</p></td>
+<td align="left"><p>This is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>repartitionInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>repartitionCheckInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies the minimum interval between two offset updates. Default Value = 5 Seconds</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>initialPartitionCount</p></td>
+<td align="left"><p>When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>consumer</p></td>
+<td align="left"><p>This is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.</p></td>
+</tr>
+</tbody>
+</table>
+
+#### Abstract Methods
+
+void emitTuple(Message message): Abstract method that emits tuples
+extracted from Kafka message.
+
+### KafkaConsumer
+
+This is an abstract implementation of Kafka consumer. It sends the fetch
+requests to the leading brokers of Kafka partitions. For each request,
+it receives the set of messages and stores them into the buffer which is
+ArrayBlockingQueue. SimpleKafkaConsumer which extends
+KafkaConsumer and serves the functionality of Simple Consumer API and
+HighLevelKafkaConsumer which extends KafkaConsumer and  serves the
+functionality of High Level Consumer API.
+
+### Pre-requisites
+
+This operator referred the Kafka Consumer API of version
+0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.
+
+#### Configuration Parameters
+
+<table>
+<col width="15%" />
+<col width="15%" />
+<col width="15%" />
+<col width="55%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>zookeeper</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p></p></td>
+<td align="left"><p>Specifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper  is a string in the form of hostname1:port1,hostname2:port2,hostname3:port3  where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server.  If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6</p>
+<p>where</p>
+<p>c1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cluster</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>cacheSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1024</p></td>
+<td align="left"><p>Maximum of buffered messages hold in memory.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>topic</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>default_topic</p></td>
+<td align="left"><p>Indicates the name of the topic.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>initialOffset</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>latest</p></td>
+<td align="left"><p>Indicates the type of offset i.e, “earliest or latest”. If initialOffset is “latest”, then the operator consumes messages from latest point of Kafka queue. If initialOffset is “earliest”, then the operator consumes messages starting from message queue. This can be overridden by OffsetManager.</p></td>
+</tr>
+</tbody>
+</table>
+
+#### Abstract Methods
+
+1.   void commitOffset(): Commit the offsets at checkpoint.
+2.  Map &lt;KafkaPartition, Long&gt; getCurrentOffsets(): Return the current
+    offset status.
+3.  resetPartitionsAndOffset(Set &lt;KafkaPartition&gt; partitionIds,
+    Map &lt;KafkaPartition, Long&gt; startOffset): Reset the partitions with
+    parittionIds and offsets with startOffset.
+
+#### Configuration Parameters for SimpleKafkaConsumer
+
+<table>
+<col width="25%" />
+<col width="15%" />
+<col width="15%" />
+<col width="45%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>bufferSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1 MB</p></td>
+<td align="left"><p>Specifies the maximum total size of messages for each fetch request.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>metadataRefreshInterval</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>30 Seconds</p></td>
+<td align="left"><p>Interval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of -1 disables this feature.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>metadataRefreshRetryLimit</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>-1</p></td>
+<td align="left"><p>Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.</p></td>
+</tr>
+</tbody>
+</table>
+
+### OffsetManager
+
+This is an interface for offset management and is useful when consuming data
+from specified offsets. Updates the offsets for all the Kafka partitions
+periodically. Below is the code snippet:        
+
+```java
+public interface OffsetManager
+{
+  public Map<KafkaPartition, Long> loadInitialOffsets();
+  public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions);
+}
+```
+#### Abstract Methods                 
+
+Map &lt;KafkaPartition, Long&gt; loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.
+
+updateOffsets(Map &lt;KafkaPartition, Long&gt; offsetsOfPartitions):  This
+method is called at every repartitionCheckInterval to update offsets.
+
+### Partitioning
+
+The logical instance of the KafkaInputOperator acts as the Partitioner
+as well as a StatsListener. This is because the
+AbstractKafkaInputOperator implements both the
+com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener
+interfaces and provides an implementation of definePartitions(...) and
+processStats(...) which makes it auto-scalable.
+
+#### Response processStats(BatchedOperatorStats stats)
+
+The application master invokes this method on the logical instance with
+the stats (tuplesProcessedPS, bytesPS, etc.) of each partition.
+Re-partitioning happens based on whether any new Kafka partitions added for
+the topic or bytesPS and msgPS cross their respective upper bounds.
+
+#### DefinePartitions
+
+Based on the repartitionRequired field of the Response object which is
+returned by processStats(...) method, the application master invokes
+definePartitions(...) on the logical instance which is also the
+partitioner instance. Dynamic partition can be disabled by setting the
+parameter repartitionInterval value to a negative value.
+
+### AbstractSinglePortKafkaInputOperator
+
+This class extends AbstractKafkaInputOperator and having single output
+port, will emit the messages through this port.
+
+#### Ports
+
+outputPort &lt;T&gt;: Tuples extracted from Kafka messages are emitted through
+this port.
+
+#### Abstract Methods
+
+T getTuple(Message msg) : Converts the Kafka message to tuple.
+
+### Concrete Classes
+
+1.  KafkaSinglePortStringInputOperator :
+This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.
+
+2.  KafkaSinglePortByteArrayInputOperator:
+This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.
+
+### Application Example
+
+This section builds an Apex application using Kafka input operator.
+Below is the code snippet:
+
+```java
+@ApplicationAnnotation(name = "KafkaApp")
+public class ExampleKafkaApplication implements StreamingApplication
+{
+@Override
+public void populateDAG(DAG dag, Configuration entries)
+{
+  KafkaSinglePortByteArrayInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());
+
+  ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+  dag.addStream("MessageData", input.outputPort, output.input);
+}
+}
+```
+Below is the configuration for “test” Kafka topic name and
+“localhost:2181” is the zookeeper forum:
+
+```xml
+<property>
+<name>dt.operator.MessageReader.prop.topic</name>
+<value>test</value>
+</property>
+
+<property>
+<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
+<value>localhost:2181</value>
+</property>
+```


[08/18] incubator-apex-malhar git commit: Adding mkdocs index closes #208

Posted by th...@apache.org.
Adding mkdocs index
closes #208


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5cecce47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5cecce47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5cecce47

Branch: refs/heads/devel-3
Commit: 5cecce47ea83ae4ff07be5d73edf801db83699cd
Parents: e89f57e
Author: Sasha <sa...@datatorrent.com>
Authored: Fri Mar 11 11:37:17 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Mar 11 19:22:48 2016 -0800

----------------------------------------------------------------------
 docs/favicon.ico | Bin 0 -> 25597 bytes
 docs/index.md    |  59 ++++++++++++++++++++++++++++++++++++++++++++++++++
 mkdocs.yml       |  10 +++++++++
 3 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5cecce47/docs/favicon.ico
----------------------------------------------------------------------
diff --git a/docs/favicon.ico b/docs/favicon.ico
new file mode 100644
index 0000000..c0b3dae
Binary files /dev/null and b/docs/favicon.ico differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5cecce47/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
new file mode 100644
index 0000000..7b6a441
--- /dev/null
+++ b/docs/index.md
@@ -0,0 +1,59 @@
+Apache Apex Malhar
+================================================================================
+
+Apache Apex Malhar is an open source operator and codec library that can be used with the [Apache Apex](http://apex.apache.org/) platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.
+
+![MalharDiagram](images/malhar-operators.png)
+
+# Capabilities common across Malhar operators
+
+For most streaming platforms, connectors are afterthoughts and often end up being simple ‘bolt-ons’ to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities
+
+1.  **Fault tolerance** – Malhar operators where applicable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.
+2.  **Processing guarantees** – Malhar operators where applicable provide out of the box support for ALL three processing guarantees – exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence need the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.
+3.  **Dynamic updates** – Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incurring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.
+4.  **Ease of extensibility** – Malhar operators are based on templates that are easy to extend.
+5.  **Partitioning support** – In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  For example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.
+
+# Operator Library Overview
+
+## Input/output connectors
+
+Below is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator
+
+*   **File Systems** – Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the application is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input & output operators for HDFS, S3, NFS & Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.
+*   **Relational Databases** – Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational dashboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.
+*   **NoSQL Databases** – NoSQL key-value pair databases like Cassandra & HBase are a common part of streaming analytics application architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.
+*   **Messaging Systems** – Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.
+*   **Notification Systems** – Malhar includes an operator for sending notifications via SMTP.
+*   **In-memory Databases & Caching platforms** - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar has operators for memcached and Redis.
+*   **Social Media** - Malhar includes an operator to connect to the popular Twitter stream fire hose.
+*   **Protocols** - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQTT.
+
+## Parsers
+
+There are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM & SAX), JSON (flat map converter), Apache log files, syslog, etc.
+
+## Stream manipulation
+
+Streaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like – GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, Update etc.
+
+## Compute
+
+One of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.
+
+Below is just a snapshot of the compute operators available in Malhar
+
+*   Statistics and math - Various mathematical and statistical computations over application defined time windows.
+*   Filtering and pattern matching
+*   Sorting, maps, frequency, TopN, BottomN
+*   Random data generators
+
+## Languages Support
+
+Migrating to a new platform often requires re-use of the existing code that would be difficult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:
+
+* JavaScript
+* Python
+* R
+* Ruby
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5cecce47/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
new file mode 100644
index 0000000..fdbc375
--- /dev/null
+++ b/mkdocs.yml
@@ -0,0 +1,10 @@
+site_name: Apache Apex Malhar Documentation
+site_favicon: favicon.ico
+theme: readthedocs
+pages:
+- Apache Apex Malhar: index.md
+- Operators:
+    - Kafka Input: operators/kafkaInputOperator.md
+    - File Splitter: operators/file_splitter.md
+    - Block Reader: operators/block_reader.md
+    - File Output: operators/file_output.md


[18/18] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2004'

Posted by th...@apache.org.
Merge branch 'APEXMALHAR-2004'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5373a3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5373a3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5373a3cb

Branch: refs/heads/devel-3
Commit: 5373a3cb65f61704d5f1ce96415a2a5c0193a52e
Parents: becee7f 327a399
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Mar 17 13:29:21 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Mar 17 13:29:21 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            |  8 ++--
 .../lib/io/fs/FileSplitterInputTest.java        | 47 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5373a3cb/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5373a3cb/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------