You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/07/12 04:25:11 UTC
[jira] [Commented] (APEXMALHAR-2019) Creation of S3 Input Module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372154#comment-15372154 ]
ASF GitHub Bot commented on APEXMALHAR-2019:
--------------------------------------------
Github user yogidevendra commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/263#discussion_r70375602
--- Diff: library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * S3BlockReader extends from BlockReader and serves the functionality of read objects and
+ * parse Block metadata
+ */
+public class S3BlockReader extends BlockReader
+{
+ protected transient String s3bucketUri;
+
+ public S3BlockReader()
+ {
+ this.readerContext = new S3BlockReaderContext();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ s3bucketUri = fs.getScheme() + "://" + extractBucket(uri);
+ }
+
+ @VisibleForTesting
+ protected String extractBucket(String s3uri)
+ {
+ return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@')));
+ }
+
+ @Override
+ protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+ {
+ FSDataInputStream ins = fs.open(new Path(s3bucketUri + block.getFilePath()));
+ ins.seek(block.getOffset());
+ return ins;
+ }
+
+ /**
+ * BlockReadeContext for reading S3 Blocks.<br/>
+ * This should use read API without offset.
+ */
+ private static class S3BlockReaderContext extends ReaderContext.FixedBytesReaderContext<FSDataInputStream>
+ {
+ @Override
+ protected Entity readEntity() throws IOException
--- End diff --
@chaithu14 Could you please add javadoc to readEntity() mentioning the difference between the super implementation?
> Creation of S3 Input Module
> ---------------------------
>
> Key: APEXMALHAR-2019
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2019
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chaitanya
> Assignee: Chaitanya
> Priority: Minor
>
> This module serves the functionality of parallel read files from S3 bucket.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)