You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/07/12 04:25:11 UTC

[jira] [Commented] (APEXMALHAR-2019) Creation of S3 Input Module

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372154#comment-15372154 ] 

ASF GitHub Bot commented on APEXMALHAR-2019:
--------------------------------------------

Github user yogidevendra commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/263#discussion_r70375602
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.IOException;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.lib.io.block.BlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.block.ReaderContext;
    +
    +/**
    + * S3BlockReader extends from BlockReader and serves the functionality of read objects and
    + * parse Block metadata
    + */
    +public class S3BlockReader extends BlockReader
    +{
    +  protected transient String s3bucketUri;
    +
    +  public S3BlockReader()
    +  {
    +    this.readerContext = new S3BlockReaderContext();
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    s3bucketUri = fs.getScheme() + "://" + extractBucket(uri);
    +  }
    +
    +  @VisibleForTesting
    +  protected String extractBucket(String s3uri)
    +  {
    +    return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@')));
    +  }
    +
    +  @Override
    +  protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
    +  {
    +    FSDataInputStream ins = fs.open(new Path(s3bucketUri + block.getFilePath()));
    +    ins.seek(block.getOffset());
    +    return ins;
    +  }
    +
    +  /**
    +   * BlockReadeContext for reading S3 Blocks.<br/>
    +   * This should use read API without offset.
    +   */
    +  private static class S3BlockReaderContext extends ReaderContext.FixedBytesReaderContext<FSDataInputStream>
    +  {
    +    @Override
    +    protected Entity readEntity() throws IOException
    --- End diff --
    
    @chaithu14 Could you please add javadoc to readEntity() mentioning the difference between the super implementation?


> Creation of S3 Input Module
> ---------------------------
>
>                 Key: APEXMALHAR-2019
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2019
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Chaitanya
>            Assignee: Chaitanya
>            Priority: Minor
>
> This module serves the functionality of parallel read files from S3 bucket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)