You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/16 10:19:59 UTC

apex-malhar git commit: APEXMALHAR-2325 1) Set the file system default block size to the reader context. 2) Set the block size to the reader in FSInputModule

Repository: apex-malhar
Updated Branches:
  refs/heads/master 9db044741 -> 6b0e931ce


APEXMALHAR-2325 1) Set the file system default block size to the reader context. 2) Set the block size to the reader in FSInputModule


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6b0e931c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6b0e931c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6b0e931c

Branch: refs/heads/master
Commit: 6b0e931ce6f676273ff1978c03be5f2e237c050d
Parents: 9db0447
Author: chaitanya <ch...@apache.org>
Authored: Thu Nov 10 15:09:27 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Wed Nov 16 15:09:26 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/FSSliceReader.java | 12 ++++++
 .../datatorrent/lib/io/fs/FSInputModule.java    |  4 ++
 .../lib/io/block/FSSliceReaderTest.java         | 40 ++++++++++++++++++++
 3 files changed, 56 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
index 60fd93c..9ac2a61 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java
@@ -18,6 +18,9 @@
  */
 package com.datatorrent.lib.io.block;
 
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.netlet.util.Slice;
 
@@ -39,6 +42,15 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice>
   }
 
   @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    if (basePath != null && this.readerContext instanceof ReaderContext.FixedBytesReaderContext) {
+      ((ReaderContext.FixedBytesReaderContext)this.readerContext).setLength((int)fs.getDefaultBlockSize(new Path(basePath)));
+    }
+  }
+
+  @Override
   protected Slice convertToRecord(byte[] bytes)
   {
     return new Slice(bytes);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index 6d54756..d4edb40 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -29,6 +29,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
 import com.datatorrent.lib.io.block.AbstractBlockReader;
 import com.datatorrent.lib.io.block.BlockMetadata;
 import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -98,6 +99,9 @@ public class FSInputModule implements Module
     }
     if (blockSize != 0) {
       fileSplitter.setBlockSize(blockSize);
+      if (blockReader.getReaderContext() instanceof ReaderContext.FixedBytesReaderContext) {
+        ((ReaderContext.FixedBytesReaderContext)blockReader.getReaderContext()).setLength((int)blockSize);
+      }
     }
 
     FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
index 37222a7..374617a 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java
@@ -24,12 +24,17 @@ import java.io.IOException;
 import java.util.List;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
@@ -38,6 +43,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.netlet.util.Slice;
 
+import static org.mockito.Mockito.when;
+
 /**
  * Tests for {@link FSSliceReader}.
  */
@@ -135,4 +142,37 @@ public class FSSliceReaderTest
     FileUtils.contentEquals(testMeta.dataFile, outputFile);
   }
 
+  @Mock
+  FileSystem fileSystem;
+
+  public class FSTestReader extends FSSliceReader
+  {
+    @Override
+    protected FileSystem getFSInstance() throws IOException
+    {
+      return fileSystem;
+    }
+  }
+
+  @Before
+  public void beforeTest()
+  {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testBlockSize() throws IOException
+  {
+    long blockSize = 1000;
+    Path path = new Path(testMeta.output);
+    when(fileSystem.getDefaultBlockSize(path)).thenReturn(blockSize);
+    Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+    readerAttr.put(DAG.APPLICATION_ID, Long.toHexString(System.currentTimeMillis()));
+    readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+
+    FSTestReader reader = new FSTestReader();
+    reader.setBasePath(testMeta.output);
+    reader.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr));
+    Assert.assertEquals("Block Size", blockSize, (long)((ReaderContext.FixedBytesReaderContext)reader.getReaderContext()).getLength());
+  }
 }