You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by yo...@apache.org on 2016/07/14 06:28:56 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2019 Implemented S3 Input Module

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0b66f19d1 -> 23970382c


APEXMALHAR-2019 Implemented S3 Input Module


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5de26e4a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5de26e4a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5de26e4a

Branch: refs/heads/master
Commit: 5de26e4a0d65652e81007445991f88075089fb0c
Parents: 67b84dd
Author: Chaitanya <ch...@datatorrent.com>
Authored: Wed Jul 13 15:46:13 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Wed Jul 13 15:46:13 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |  16 ++
 .../datatorrent/lib/io/fs/S3BlockReader.java    | 126 +++++++++
 .../datatorrent/lib/io/fs/S3InputModule.java    |  68 +++++
 .../lib/io/fs/S3InputModuleAppTest.java         | 257 +++++++++++++++++++
 4 files changed, 467 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 7242027..e9f64c8 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -186,6 +186,16 @@
           <suppressionsLocation>library-checkstyle-suppressions.xml</suppressionsLocation>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.19.1</version>
+        <configuration>
+          <excludes>
+            <exclude>**/S3InputModuleAppTest.java</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
@@ -335,6 +345,12 @@
       <version>1.8.5</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>1.10.73</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
new file mode 100644
index 0000000..34f64ed
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * S3BlockReader extends from BlockReader and serves the functionality of read objects and
+ * parse Block metadata
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3BlockReader extends FSSliceReader
+{
+  protected transient String s3bucketUri;
+  private String bucketName;
+
+  public S3BlockReader()
+  {
+    this.readerContext = new S3BlockReaderContext();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    s3bucketUri = fs.getScheme() + "://" + bucketName;
+  }
+
+  /**
+   * Extracts the bucket name from the given uri
+   * @param s3uri s3 uri
+   * @return name of the bucket
+   */
+  @VisibleForTesting
+  protected static String extractBucket(String s3uri)
+  {
+    return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@')));
+  }
+
+  /**
+   * Create the stream from the bucket uri and block path.
+   * @param block block metadata
+   * @return stream
+   * @throws IOException
+   */
+  @Override
+  protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+  {
+    FSDataInputStream ins = fs.open(new Path(s3bucketUri + block.getFilePath()));
+    ins.seek(block.getOffset());
+    return ins;
+  }
+
+  /**
+   * BlockReadeContext for reading S3 Blocks. Stream could't able be read the complete block.
+   * This will wait till the block reads completely.
+   */
+  private static class S3BlockReaderContext extends ReaderContext.FixedBytesReaderContext<FSDataInputStream>
+  {
+    /**
+     * S3 File systems doesn't read the specified block completely while using readFully API.
+     * This will read small chunks continuously until will reach the specified block size.
+     * @return the block entity
+     * @throws IOException
+     */
+    @Override
+    protected Entity readEntity() throws IOException
+    {
+      entity.clear();
+      int bytesToRead = length;
+      if (offset + length >= blockMetadata.getLength()) {
+        bytesToRead = (int)(blockMetadata.getLength() - offset);
+      }
+      byte[] record = new byte[bytesToRead];
+      int bytesRead = 0;
+      while (bytesRead < bytesToRead) {
+        bytesRead += stream.read(record, bytesRead, bytesToRead - bytesRead);
+      }
+      entity.setUsedBytes(bytesRead);
+      entity.setRecord(record);
+      return entity;
+    }
+  }
+
+  /**
+   * Get the S3 bucket name
+   * @return bucket
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the bucket name
+   * @param bucketName bucket name
+   */
+  public void setBucketName(String bucketName)
+  {
+    this.bucketName = bucketName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
new file mode 100644
index 0000000..50c40ec
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import com.datatorrent.lib.io.block.FSSliceReader;
+
+/**
+ * S3InputModule is used to read files/list of files (or directory) from S3 bucket. <br/>
+ * Module emits, <br/>
+ * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
+ * Parallel read will work only if the scheme is "s3a" and the Hadoop version is 2.7+.
+ * Parallel read doesn't work in the case of the scheme is "s3n/s3". In this case, this operator explicitly
+ * disables the parallel read functionality.
+ * For more info about S3 scheme protocals, please have a look at
+ * <a href="https://wiki.apache.org/hadoop/AmazonS3">https://wiki.apache.org/hadoop/AmazonS3.</a>
+ *
+ * The module reads data in parallel, following parameters can be configured<br/>
+ * 1. files: List of file(s)/directories to read. files would be in the form of
+ *           SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory ,
+ *           SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , ....
+ *           where SCHEME is the protocal scheme for the file system.
+ *                 AccessKey is the AWS access key and SecretKey is the AWS Secret Key<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequencialFileRead: Is emit file blocks in sequence?
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3InputModule extends FSInputModule
+{
+  /**
+   * Creates the block reader for reading s3 blocks
+   * @return S3BlockReader
+   */
+  @Override
+  public FSSliceReader createBlockReader()
+  {
+    //Extract the scheme from the files
+    String s3input = getFiles();
+    String scheme =  s3input.substring(0, s3input.indexOf("://"));
+    // Parallel read doesn't support, if the scheme is s3 (or) s3n.
+    if (scheme.equals("s3") || scheme.equals("s3n")) {
+      setSequencialFileRead(true);
+    }
+    // Set the s3 bucket name to the block reader
+    S3BlockReader reader = new S3BlockReader();
+    reader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+    return reader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java
new file mode 100644
index 0000000..3d7aef0
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.stream.DevNull;
+import com.datatorrent.netlet.util.Slice;
+
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3InputModuleAppTest
+{
+  private String inputDir;
+  static String outputDir;
+  private StreamingApplication app;
+  private String accessKey = "*************";
+  private String secretKey = "**************";
+  private AmazonS3 client;
+  private String files;
+  private static final String SCHEME = "s3n";
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "File one data";
+  private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
+  static final String OUT_DATA_FILE = "fileData.txt";
+  static final String OUT_METADATA_FILE = "fileMetaData.txt";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+    public String bucketKey;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+      this.bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+    }
+
+  }
+
+  @Rule
+  public S3InputModuleAppTest.TestMeta testMeta = new S3InputModuleAppTest.TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+    client.createBucket(testMeta.bucketKey);
+
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+    outputDir = testMeta.baseDirectory + File.separator + "output";
+
+    File file1 = new File(inputDir + File.separator + FILE_1);
+    File file2 = new File(inputDir + File.separator + FILE_2);
+
+    FileUtils.writeStringToFile(file1, FILE_1_DATA);
+    FileUtils.writeStringToFile(file2, FILE_2_DATA);
+    client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_1, file1));
+    client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_2, file2));
+    files = SCHEME + "://" + accessKey + ":" + secretKey + "@" + testMeta.bucketKey + "/input";
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(inputDir));
+    FileUtils.deleteDirectory(new File(outputDir));
+    deleteBucketAndContent();
+    //client.deleteBucket(testMeta.bucketKey);
+  }
+
+  public void deleteBucketAndContent()
+  {
+    //Get the list of objects
+    ObjectListing objectListing = client.listObjects(testMeta.bucketKey);
+    for ( Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) {
+      S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+      LOG.info("Deleting an object: {}",objectSummary.getKey());
+      client.deleteObject(testMeta.bucketKey, objectSummary.getKey());
+    }
+    client.deleteBucket(testMeta.bucketKey);
+  }
+
+  @Test
+  public void testS3Application() throws Exception
+  {
+    app = new S3InputModuleAppTest.Application();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.s3InputModule.prop.files", files);
+    conf.set("dt.operator.s3InputModule.prop.blockSize", "10");
+    conf.set("dt.operator.s3InputModule.prop.scanIntervalMillis", "10000");
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+    FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+    while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
+      Thread.sleep(500);
+      LOG.debug("Waiting for {}", outDir);
+    }
+
+    Thread.sleep(10000);
+    lc.shutdown();
+
+    Assert.assertTrue("output dir does not exist", fs.exists(outDir));
+
+    File dir = new File(outputDir);
+    FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
+
+    fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
+    verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
+  }
+
+  private void verifyFileContents(File[] files, String expectedData) throws IOException
+  {
+    StringBuilder filesData = new StringBuilder();
+    for (File file : files) {
+      filesData.append(FileUtils.readFileToString(file));
+    }
+    Assert.assertTrue("File data doesn't contain expected text", filesData.indexOf(expectedData) > -1);
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(S3InputModuleAppTest.class);
+
+  private static class Application implements StreamingApplication
+  {
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      S3InputModule module = dag.addModule("s3InputModule", S3InputModule.class);
+
+      AbstractFileOutputOperator<AbstractFileSplitter.FileMetadata> metadataWriter = new S3InputModuleAppTest.MetadataWriter(S3InputModuleAppTest.OUT_METADATA_FILE);
+      metadataWriter.setFilePath(S3InputModuleAppTest.outputDir);
+      dag.addOperator("FileMetadataWriter", metadataWriter);
+
+      AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> dataWriter = new S3InputModuleAppTest.HDFSFileWriter(S3InputModuleAppTest.OUT_DATA_FILE);
+      dataWriter.setFilePath(S3InputModuleAppTest.outputDir);
+      dag.addOperator("FileDataWriter", dataWriter);
+
+      DevNull<BlockMetadata.FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
+
+      dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
+      dag.addStream("data", module.messages, dataWriter.input);
+      dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
+    }
+  }
+
+  private static class MetadataWriter extends AbstractFileOutputOperator<AbstractFileSplitter.FileMetadata>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private MetadataWriter()
+    {
+
+    }
+
+    public MetadataWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(AbstractFileSplitter.FileMetadata tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(AbstractFileSplitter.FileMetadata tuple)
+    {
+      return (tuple).toString().getBytes();
+    }
+  }
+
+  private static class HDFSFileWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private HDFSFileWriter()
+    {
+    }
+
+    public HDFSFileWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
+    {
+      return tuple.getRecord().buffer;
+    }
+  }
+
+}


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2019-S3-Input' of https://github.com/chaithu14/incubator-apex-malhar

Posted by yo...@apache.org.
Merge branch 'APEXMALHAR-2019-S3-Input' of https://github.com/chaithu14/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/23970382
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/23970382
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/23970382

Branch: refs/heads/master
Commit: 23970382c646afad7af8ac1133171e63ff11d7c1
Parents: 0b66f19 5de26e4
Author: yogidevendra <yo...@apache.org>
Authored: Thu Jul 14 11:44:57 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Thu Jul 14 11:44:57 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |  16 ++
 .../datatorrent/lib/io/fs/S3BlockReader.java    | 126 +++++++++
 .../datatorrent/lib/io/fs/S3InputModule.java    |  68 +++++
 .../lib/io/fs/S3InputModuleAppTest.java         | 257 +++++++++++++++++++
 4 files changed, 467 insertions(+)
----------------------------------------------------------------------