You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/16 10:19:59 UTC
apex-malhar git commit: APEXMALHAR-2325 1) Set the file system
default block size to the reader context. 2) Set the block size to the reader
in FSInputModule
Repository: apex-malhar
Updated Branches:
refs/heads/master 9db044741 -> 6b0e931ce
APEXMALHAR-2325 1) Set the file system default block size to the reader context. 2) Set the block size to the reader in FSInputModule
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6b0e931c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6b0e931c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6b0e931c
Branch: refs/heads/master
Commit: 6b0e931ce6f676273ff1978c03be5f2e237c050d
Parents: 9db0447
Author: chaitanya <ch...@apache.org>
Authored: Thu Nov 10 15:09:27 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Wed Nov 16 15:09:26 2016 +0530
----------------------------------------------------------------------
.../datatorrent/lib/io/block/FSSliceReader.java | 12 ++++++
.../datatorrent/lib/io/fs/FSInputModule.java | 4 ++
.../lib/io/block/FSSliceReaderTest.java | 40 ++++++++++++++++++++
3 files changed, 56 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
index 60fd93c..9ac2a61 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
@@ -18,6 +18,9 @@
*/
package com.datatorrent.lib.io.block;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
import com.datatorrent.api.StatsListener;
import com.datatorrent.netlet.util.Slice;
@@ -39,6 +42,15 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice>
}
@Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ if (basePath != null && this.readerContext instanceof ReaderContext.FixedBytesReaderContext) {
+ ((ReaderContext.FixedBytesReaderContext)this.readerContext).setLength((int)fs.getDefaultBlockSize(new Path(basePath)));
+ }
+ }
+
+ @Override
protected Slice convertToRecord(byte[] bytes)
{
return new Slice(bytes);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index 6d54756..d4edb40 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -29,6 +29,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
import com.datatorrent.netlet.util.Slice;
/**
@@ -98,6 +99,9 @@ public class FSInputModule implements Module
}
if (blockSize != 0) {
fileSplitter.setBlockSize(blockSize);
+ if (blockReader.getReaderContext() instanceof ReaderContext.FixedBytesReaderContext) {
+ ((ReaderContext.FixedBytesReaderContext)blockReader.getReaderContext()).setLength((int)blockSize);
+ }
}
FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
index 37222a7..374617a 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
@@ -24,12 +24,17 @@ import java.io.IOException;
import java.util.List;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
@@ -38,6 +43,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.netlet.util.Slice;
+import static org.mockito.Mockito.when;
+
/**
* Tests for {@link FSSliceReader}.
*/
@@ -135,4 +142,37 @@ public class FSSliceReaderTest
FileUtils.contentEquals(testMeta.dataFile, outputFile);
}
+ @Mock
+ FileSystem fileSystem;
+
+ public class FSTestReader extends FSSliceReader
+ {
+ @Override
+ protected FileSystem getFSInstance() throws IOException
+ {
+ return fileSystem;
+ }
+ }
+
+ @Before
+ public void beforeTest()
+ {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testBlockSize() throws IOException
+ {
+ long blockSize = 1000;
+ Path path = new Path(testMeta.output);
+ when(fileSystem.getDefaultBlockSize(path)).thenReturn(blockSize);
+ Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+ readerAttr.put(DAG.APPLICATION_ID, Long.toHexString(System.currentTimeMillis()));
+ readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+
+ FSTestReader reader = new FSTestReader();
+ reader.setBasePath(testMeta.output);
+ reader.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr));
+ Assert.assertEquals("Block Size", blockSize, (long)((ReaderContext.FixedBytesReaderContext)reader.getReaderContext()).getLength());
+ }
}