You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/17 10:24:30 UTC

apex-malhar git commit: APEXMALHAR-2272 : Fixed sequentialFileRead on FSInputModule

Repository: apex-malhar
Updated Branches:
  refs/heads/master 9eadce12c -> 2153cd6ba


APEXMALHAR-2272 : Fixed sequentialFileRead on FSInputModule

1. Fixed StreamCodec to route all blocks for a file to same partition.

2. Fixed spelling for sequentialFileRead in the code, javadocs.
3.marked FSInputModule as evolving
4.added japicmp exclusion for FSInputModule


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2153cd6b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2153cd6b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2153cd6b

Branch: refs/heads/master
Commit: 2153cd6babadf11e7be63a137daa7dd2c3ef7230
Parents: 9eadce1
Author: yogidevendra <yo...@apache.org>
Authored: Sat Oct 1 08:19:56 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Mon Oct 17 15:35:32 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/fs/FSInputModule.java    | 26 ++++-----
 .../datatorrent/lib/io/fs/S3InputModule.java    |  2 +-
 .../lib/io/fs/FSInputModuleTest.java            | 57 ++++++++++++++++++++
 pom.xml                                         |  1 +
 4 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index d276dd7..d71111c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -42,12 +42,12 @@ import com.datatorrent.netlet.util.Slice;
  * 4. recursive: if scan recursively input directories<br/>
  * 5. blockSize: block size used to read input blocks of file<br/>
  * 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: If emit file blocks in sequence?<br/>
+ * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
  * 8. blocksThreshold: number of blocks emitted per window
  *
  * @since 3.5.0
  */
-
+@org.apache.hadoop.classification.InterfaceStability.Evolving
 public class FSInputModule implements Module
 {
   @NotNull
@@ -58,7 +58,7 @@ public class FSInputModule implements Module
   private long scanIntervalMillis;
   private boolean recursive = true;
   private long blockSize;
-  private boolean sequencialFileRead = false;
+  private boolean sequentialFileRead = false;
   private int readersCount;
   @Min(1)
   protected int blocksThreshold;
@@ -89,7 +89,7 @@ public class FSInputModule implements Module
     blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
     messages.set(blockReader.messages);
 
-    if (sequencialFileRead) {
+    if (sequentialFileRead) {
       dag.setInputPortAttribute(blockReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC,
           new SequentialFileBlockMetadataCodec());
     }
@@ -236,23 +236,23 @@ public class FSInputModule implements Module
   }
 
   /**
-   * Gets is sequencial file read
+   * Gets is sequential file read
    *
-   * @return sequencialFileRead
+   * @return sequentialFileRead
    */
-  public boolean isSequencialFileRead()
+  public boolean isSequentialFileRead()
   {
-    return sequencialFileRead;
+    return sequentialFileRead;
   }
 
   /**
-   * Sets is sequencial file read
+   * Sets is sequential file read
    *
-   * @param sequencialFileRead
+   * @param sequentialFileRead
    */
-  public void setSequencialFileRead(boolean sequencialFileRead)
+  public void setSequentialFileRead(boolean sequentialFileRead)
   {
-    this.sequencialFileRead = sequencialFileRead;
+    this.sequentialFileRead = sequentialFileRead;
   }
 
   /**
@@ -283,7 +283,7 @@ public class FSInputModule implements Module
     @Override
     public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata)
     {
-      return fileBlockMetadata.hashCode();
+      return fileBlockMetadata.getFilePath().hashCode();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
index bd55161..366eaa5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
@@ -38,7 +38,7 @@ import com.datatorrent.lib.io.block.FSSliceReader;
  * 4. recursive: if scan recursively input directories<br/>
  * 5. blockSize: block size used to read input blocks of file<br/>
  * 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: Is emit file blocks in sequence?
+ * 7. sequentialFileRead: Is emit file blocks in sequence?
  *
  * @since 3.5.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
new file mode 100644
index 0000000..12301f6
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.fs.FSInputModule.SequentialFileBlockMetadataCodec;
+
+import static org.mockito.Mockito.when;
+
+public class FSInputModuleTest
+{
+  @Mock
+  BlockMetadata.FileBlockMetadata file1block1;
+  @Mock
+  BlockMetadata.FileBlockMetadata file1block2;
+  @Mock
+  BlockMetadata.FileBlockMetadata file2block1;
+
+  @Test
+  public void testSequentialFileBlockMetadataCodec()
+  {
+
+    MockitoAnnotations.initMocks(this);
+    when(file1block1.getFilePath()).thenReturn("file1");
+    when(file1block2.getFilePath()).thenReturn("file1");
+    when(file2block1.getFilePath()).thenReturn("file2");
+
+    SequentialFileBlockMetadataCodec codec = new SequentialFileBlockMetadataCodec();
+
+    Assert.assertEquals("Blocks of same file distributed to different partitions", codec.getPartition(file1block1),
+        codec.getPartition(file1block2));
+    Assert.assertNotSame("Blocks of different files distributed to same partition", codec.getPartition(file1block1),
+        codec.getPartition(file2block1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9ce982..cb8bd93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
               <excludes>
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
                 <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+                <exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude>
               </excludes>
             </parameter>
             <skip>${semver.plugin.skip}</skip>