You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/17 10:24:30 UTC
apex-malhar git commit: APEXMALHAR-2272 : Fixed sequentialFileRead on
FSInputModule
Repository: apex-malhar
Updated Branches:
refs/heads/master 9eadce12c -> 2153cd6ba
APEXMALHAR-2272 : Fixed sequentialFileRead on FSInputModule
1. Fixed StreamCodec to route all blocks for a file to same partition.
2. Fixed spelling for sequentialFileRead in the code, javadocs.
3.marked FSInputModule as evolving
4.added japicmp exclusion for FSInputModule
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2153cd6b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2153cd6b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2153cd6b
Branch: refs/heads/master
Commit: 2153cd6babadf11e7be63a137daa7dd2c3ef7230
Parents: 9eadce1
Author: yogidevendra <yo...@apache.org>
Authored: Sat Oct 1 08:19:56 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Mon Oct 17 15:35:32 2016 +0530
----------------------------------------------------------------------
.../datatorrent/lib/io/fs/FSInputModule.java | 26 ++++-----
.../datatorrent/lib/io/fs/S3InputModule.java | 2 +-
.../lib/io/fs/FSInputModuleTest.java | 57 ++++++++++++++++++++
pom.xml | 1 +
4 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index d276dd7..d71111c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -42,12 +42,12 @@ import com.datatorrent.netlet.util.Slice;
* 4. recursive: if scan recursively input directories<br/>
* 5. blockSize: block size used to read input blocks of file<br/>
* 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: If emit file blocks in sequence?<br/>
+ * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
* 8. blocksThreshold: number of blocks emitted per window
*
* @since 3.5.0
*/
-
+@org.apache.hadoop.classification.InterfaceStability.Evolving
public class FSInputModule implements Module
{
@NotNull
@@ -58,7 +58,7 @@ public class FSInputModule implements Module
private long scanIntervalMillis;
private boolean recursive = true;
private long blockSize;
- private boolean sequencialFileRead = false;
+ private boolean sequentialFileRead = false;
private int readersCount;
@Min(1)
protected int blocksThreshold;
@@ -89,7 +89,7 @@ public class FSInputModule implements Module
blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
messages.set(blockReader.messages);
- if (sequencialFileRead) {
+ if (sequentialFileRead) {
dag.setInputPortAttribute(blockReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC,
new SequentialFileBlockMetadataCodec());
}
@@ -236,23 +236,23 @@ public class FSInputModule implements Module
}
/**
- * Gets is sequencial file read
+ * Gets is sequential file read
*
- * @return sequencialFileRead
+ * @return sequentialFileRead
*/
- public boolean isSequencialFileRead()
+ public boolean isSequentialFileRead()
{
- return sequencialFileRead;
+ return sequentialFileRead;
}
/**
- * Sets is sequencial file read
+ * Sets is sequential file read
*
- * @param sequencialFileRead
+ * @param sequentialFileRead
*/
- public void setSequencialFileRead(boolean sequencialFileRead)
+ public void setSequentialFileRead(boolean sequentialFileRead)
{
- this.sequencialFileRead = sequencialFileRead;
+ this.sequentialFileRead = sequentialFileRead;
}
/**
@@ -283,7 +283,7 @@ public class FSInputModule implements Module
@Override
public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata)
{
- return fileBlockMetadata.hashCode();
+ return fileBlockMetadata.getFilePath().hashCode();
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
index bd55161..366eaa5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
@@ -38,7 +38,7 @@ import com.datatorrent.lib.io.block.FSSliceReader;
* 4. recursive: if scan recursively input directories<br/>
* 5. blockSize: block size used to read input blocks of file<br/>
* 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: Is emit file blocks in sequence?
+ * 7. sequentialFileRead: Is emit file blocks in sequence?
*
* @since 3.5.0
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
new file mode 100644
index 0000000..12301f6
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.fs.FSInputModule.SequentialFileBlockMetadataCodec;
+
+import static org.mockito.Mockito.when;
+
+public class FSInputModuleTest
+{
+ @Mock
+ BlockMetadata.FileBlockMetadata file1block1;
+ @Mock
+ BlockMetadata.FileBlockMetadata file1block2;
+ @Mock
+ BlockMetadata.FileBlockMetadata file2block1;
+
+ @Test
+ public void testSequentialFileBlockMetadataCodec()
+ {
+
+ MockitoAnnotations.initMocks(this);
+ when(file1block1.getFilePath()).thenReturn("file1");
+ when(file1block2.getFilePath()).thenReturn("file1");
+ when(file2block1.getFilePath()).thenReturn("file2");
+
+ SequentialFileBlockMetadataCodec codec = new SequentialFileBlockMetadataCodec();
+
+ Assert.assertEquals("Blocks of same file distributed to different partitions", codec.getPartition(file1block1),
+ codec.getPartition(file1block2));
+ Assert.assertNotSame("Blocks of different files distributed to same partition", codec.getPartition(file1block1),
+ codec.getPartition(file2block1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9ce982..cb8bd93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
<excludes>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+ <exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude>
</excludes>
</parameter>
<skip>${semver.plugin.skip}</skip>