You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by DT-Priyanka <gi...@git.apache.org> on 2016/03/09 09:28:24 UTC

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

GitHub user DT-Priyanka opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207

    APEXMALHAR-2008: Create HDFS File Reader module

    Code to add HDFS file reader module. 
    1. The module reads file/list of files (directory is also accepted) and emit the file blocks. 
    2. The module can be configured to emit blocks in order or out of order.
    3. Module reads file blocks in parallel. The number of parallel readers is configurable, if not configured it will increase or decrease readers dynamically as per input data rate.
    
    Also updated code of FileSplitterInput to add some improvements:
    1. Tracking last file reference times of each folder differently, to avoid duplicates (duplicates could be due to same relative paths of multiple files/sub dir)
    2. Small improvements in code.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DT-Priyanka/incubator-apex-malhar APEXMALHAR-2008-hdfs-input-module

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #207
    
----
commit 8ffb34abe48f525d401c3932d79ada6c71214e88
Author: Priyanka Gugale <pr...@datatorrent.com>
Date:   2016-03-08T08:42:13Z

    APEXMALHAR-2008: Create HDFS File Reader module

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by yogidevendra <gi...@git.apache.org>.
Github user yogidevendra commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55525483
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    For splitting property to host, port, directory there should be discussion on dev@apex. Else, stick to original convention. We would split it later only if required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55511718
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
    @@ -384,7 +399,14 @@ protected void scanIterationComplete()
           lastScanMillis = System.currentTimeMillis();
         }
     
    -    protected void scan(@NotNull Path filePath, Path rootPath)
    +    protected void scan(Path filePath, Path rootPath)
    --- End diff --
    
    Why is NotNull removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55511560
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockMetadata.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +/**
    + * HDFSBlockMetadata extends {@link BlockMetadata} to provide an option if blocks of a file should be read in-sequence
    + * or in-parallel
    + */
    +public class HDFSBlockMetadata extends BlockMetadata.FileBlockMetadata
    +{
    --- End diff --
    
    Is this class is specific to only HDFS? If not, change the class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55512047
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    This is specific to HDFS. So, Split the files property to host, port and files/directories to copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55781098
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    +      String scheme = URI.create(f).getScheme();
    +      if (scheme != null) {
    +        uriList.append(f.replaceFirst(scheme, scheme.toLowerCase()));
    --- End diff --
    
    Just a thought: Instead of converting scheme to lower case, can we throw an exception to indicate user to give correct uri?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r56096295
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  /**
    +   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
    +   * and files containing unsupported characters. 
    +   */
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    --- End diff --
    
    I think ignoreFileRegex could be useful for non-hdfs file inputs as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r56120966
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  /**
    +   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
    +   * and files containing unsupported characters. 
    +   */
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    --- End diff --
    
    Okay I will expose this as property.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639373
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
    @@ -384,7 +399,14 @@ protected void scanIterationComplete()
           lastScanMillis = System.currentTimeMillis();
         }
     
    -    protected void scan(@NotNull Path filePath, Path rootPath)
    +    protected void scan(Path filePath, Path rootPath)
    --- End diff --
    
    By Mistake, will revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55633512
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    The purpose of Modules is to ease configuration of operators and hence it is good to separate host, port, directory etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55777007
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    --- End diff --
    
    Does it make sense to have this property in FileSplitterInput operator itself if it is not specific to HDFS?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639760
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    --- End diff --
    
    Can you please provide class level javadoc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55799956
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    --- End diff --
    
    It is not HDFS specific, but FileSplitterInput is too generic. If we decide this is good property for all purpose, I can move it to parent class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55640357
  
    --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
    @@ -110,8 +110,7 @@ protected void starting(org.junit.runner.Description description)
           fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
     
           Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
    -      attributes.put(Context.DAGContext.APPLICATION_PATH,
    -          "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    +      attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    --- End diff --
    
    Please revert all the changes in this file which are not related to this PR. Only take care of Checkstyle violations which appears in new code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639379
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    --- End diff --
    
    Class level javadoc is missing. Not having this causes problem in release process. Please add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55640050
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort();
    +
    +  @Override
    +  public void populateDAG(DAG dag, Configuration conf)
    +  {
    +    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
    +    HDFSBlockReader blockReader = dag.addOperator("BlockReader", new HDFSBlockReader());
    +
    +    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
    +
    +    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
    +    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
    +    messages.set(blockReader.messages);
    +
    +    fileSplitter.setSequencialFileRead(sequencialFileRead);
    +    if (blockSize != 0) {
    +      fileSplitter.setBlockSize(blockSize);
    +    }
    +
    +    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
    +    fileScanner.setFiles(files);
    +    if (scanIntervalMillis != 0) {
    +      fileScanner.setScanIntervalMillis(scanIntervalMillis);
    +    }
    +    fileScanner.setRecursive(recursive);
    +    if (filePatternRegularExp != null) {
    +      fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
    +    }
    +
    +    blockReader.setUri(files);
    +    if (readersCount != 0) {
    +      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<HDFSBlockReader>(readersCount));
    +    }
    +
    +    MetricsAggregator blockReaderMetrics = new MetricsAggregator();
    +    blockReaderMetrics.addAggregators("bytesReadPerSec", new SingleMetricAggregator[] {new LongSumAggregator()});
    +    dag.setAttribute(blockReader, Context.OperatorContext.METRICS_AGGREGATOR, blockReaderMetrics);
    +    dag.setAttribute(blockReader, Context.OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
    +  }
    +
    +  /**
    +   * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used.
    +   * A fully qualified path can be provided to scan directories in other filesystems.
    +   *
    +   * @param files
    +   *          files
    --- End diff --
    
    Can we please have javadoc comments in proper format?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55650568
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    +    {
    +      super();
    +    }
    +
    +    public HDFSFileMetaData(@NotNull String filePath)
    +    {
    +      super(filePath);
    +    }
    +
    +    public String getRelativePath()
    +    {
    +      return relativePath;
    +    }
    +
    +    public void setRelativePath(String relativePath)
    +    {
    +      this.relativePath = relativePath;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "HDFSFileMetaData [relativePath=" + relativePath + ", getNumberOfBlocks()=" + getNumberOfBlocks() + ", getFileName()=" + getFileName() + ", getFileLength()=" + getFileLength() + ", isDirectory()=" + isDirectory() + "]";
    --- End diff --
    
    This is auto generated code, would prefer not to change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55640205
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    --- End diff --
    
    As a part of this comment, can you provide what are the mandatory properties for this modules and what are optional?
    OR atleast in the javadoc for respective setter method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55650677
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    Won't split the properties for now, this could create problems if someone integrates operators/modules with external UI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55667219
  
    --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
    @@ -110,8 +110,7 @@ protected void starting(org.junit.runner.Description description)
           fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
     
           Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
    -      attributes.put(Context.DAGContext.APPLICATION_PATH,
    -          "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    +      attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    --- End diff --
    
    That's very wierd considering no code gets into master without rat check AFAIK.
     So without these changes does checkstyle validation fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639806
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    --- End diff --
    
    Any reason why empty constructor is protected but other constructor is private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55800097
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  /**
    +   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
    +   * and files containing unsupported characters. 
    +   */
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    --- End diff --
    
    We already expose a property called "filePatternRegularExp" for patterns to accept. Having one more property for patterns to reject, could be confusing so didn't expose it.
    Is provided reason good enough not to expose it as property or I should expose it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639847
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    +    {
    +      super();
    +    }
    +
    +    public HDFSFileMetaData(@NotNull String filePath)
    +    {
    +      super(filePath);
    +    }
    +
    +    public String getRelativePath()
    +    {
    +      return relativePath;
    +    }
    +
    +    public void setRelativePath(String relativePath)
    +    {
    +      this.relativePath = relativePath;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "HDFSFileMetaData [relativePath=" + relativePath + ", getNumberOfBlocks()=" + getNumberOfBlocks() + ", getFileName()=" + getFileName() + ", getFileLength()=" + getFileLength() + ", isDirectory()=" + isDirectory() + "]";
    --- End diff --
    
    Please have variable names in toString and not method names.
    For example change getNumberOfBlocks with numberOfBlocks etc..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55638987
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    --- End diff --
    
    Please give description for param uri in javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639986
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    --- End diff --
    
    Can you please describe what you've mentioned in this PR description as a javadoc in HDFSInputModule?
    Its good to mention in default what all this module is responsible for and also how it'll work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55799890
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    We expect uses to configure input file(s)/directories separated by ",". Should we parse it in list during set method? Does it have any advantage?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55666817
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    +    {
    +      super();
    +    }
    +
    +    public HDFSFileMetaData(@NotNull String filePath)
    +    {
    +      super(filePath);
    +    }
    +
    +    public String getRelativePath()
    +    {
    +      return relativePath;
    +    }
    +
    +    public void setRelativePath(String relativePath)
    +    {
    +      this.relativePath = relativePath;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "HDFSFileMetaData [relativePath=" + relativePath + ", getNumberOfBlocks()=" + getNumberOfBlocks() + ", getFileName()=" + getFileName() + ", getFileLength()=" + getFileLength() + ", isDirectory()=" + isDirectory() + "]";
    --- End diff --
    
    auto generated by what?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/207


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55640120
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    --- End diff --
    
    Any default for the blockSize?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55778161
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  /**
    +   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
    +   * and files containing unsupported characters. 
    +   */
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    --- End diff --
    
    One way to handle this could be to expose ignoreRegex as a property for module, with default being _COPYING_. Could be useful ignore other file extensions as well. It can be also be part of superclass if it applies to other type of file readers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639110
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder inputMod = new StringBuilder();
    --- End diff --
    
    A better name than "inputMod"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r56288758
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    I have created a Malhar jira for same: APEXMALHAR-2016, we would do it later and not part of this review request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639221
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder inputMod = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    Considering this is a utility method, I suggest not to take a comma seperated URIs here. What if someone has some other delimiter? 
    I think from design perspective its better for this method to take a single URI and let the caller of this method take care of seperator. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55776427
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,253 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
    + * Module emits, <br/>
    + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
    + * The module reads data in parallel, following parameters can be configured<br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequencialFileRead: If emit file blocks in sequence?
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort();
    --- End diff --
    
    Please use <> for generic type in instantiation: new ProxyOutputPort<>()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r56121034
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    good catch, how to we normally accept the multiple input then? Provided our primary source of properties is file based. Trying to figure out what else can be used as separator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by yogidevendra <gi...@git.apache.org>.
Github user yogidevendra commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55635731
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    --- End diff --
    
    @sandeepdeshmukh If we decide to do this; this should be discussed explicitly on discussion forum. Because, the discussion conclusion may impact many other modules. This point is not specific to this PR and should be conveyed to wider audience. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55511575
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockMetadata.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +/**
    + * HDFSBlockMetadata extends {@link BlockMetadata} to provide an option if blocks of a file should be read in-sequence
    + * or in-parallel
    + */
    +public class HDFSBlockMetadata extends BlockMetadata.FileBlockMetadata
    +{
    +  boolean readBlockInSequence;
    +
    +  protected HDFSBlockMetadata()
    +  {
    +    super();
    +  }
    +
    +  public HDFSBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
    +  {
    +    super(filePath, blockId, offset, length, isLastBlock, previousBlockId);
    +  }
    +
    +  public HDFSBlockMetadata(String filePath)
    +  {
    +    super(filePath);
    +  }
    +
    +  @Override
    +  public int hashCode()
    +  {
    +    if (isReadBlockInSequence()) {
    +      return getFilePath().hashCode();
    +    }
    +    return super.hashCode();
    +  }
    +
    +  public boolean isReadBlockInSequence()
    +  {
    +    return readBlockInSequence;
    +  }
    +
    +  public void setReadBlockInSequence(boolean readBlockInSequence)
    +  {
    +    this.readBlockInSequence = readBlockInSequence;
    +  }
    +
    --- End diff --
    
    Add java docs for setters and getters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55650442
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    --- End diff --
    
    super class is doing that, I am just carrying over the same thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55646438
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder inputMod = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    Won't keep it as utility method as no one else is going to use if as of now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55801007
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,253 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
    + * Module emits, <br/>
    + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
    + * The module reads data in parallel, following parameters can be configured<br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequencialFileRead: If emit file blocks in sequence?
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort();
    +
    +  @Override
    +  public void populateDAG(DAG dag, Configuration conf)
    +  {
    +    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
    +    BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader());
    +
    +    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
    +
    +    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
    +    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
    +    messages.set(blockReader.messages);
    +
    +    fileSplitter.setSequencialFileRead(sequencialFileRead);
    +    if (blockSize != 0) {
    +      fileSplitter.setBlockSize(blockSize);
    +    }
    +
    +    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
    +    fileScanner.setFiles(files);
    +    if (scanIntervalMillis != 0) {
    +      fileScanner.setScanIntervalMillis(scanIntervalMillis);
    +    }
    +    fileScanner.setRecursive(recursive);
    +    if (filePatternRegularExp != null) {
    +      fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
    +    }
    +
    +    blockReader.setUri(files);
    +    if (readersCount != 0) {
    +      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
    +    }
    +
    +    MetricsAggregator blockReaderMetrics = new MetricsAggregator();
    +    blockReaderMetrics.addAggregators("bytesReadPerSec", new SingleMetricAggregator[] {new LongSumAggregator()});
    +    dag.setAttribute(blockReader, Context.OperatorContext.METRICS_AGGREGATOR, blockReaderMetrics);
    +    dag.setAttribute(blockReader, Context.OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
    --- End diff --
    
    will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55800249
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,253 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
    + * Module emits, <br/>
    + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
    + * The module reads data in parallel, following parameters can be configured<br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequencialFileRead: If emit file blocks in sequence?
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort();
    --- End diff --
    
    okay.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55799843
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    +      String scheme = URI.create(f).getScheme();
    +      if (scheme != null) {
    +        uriList.append(f.replaceFirst(scheme, scheme.toLowerCase()));
    --- End diff --
    
    That's what happens by default we don't do anything. But there was a jira suggesting, we should not error out for such problem. we should be able to accept scheme irrespective of it's case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55511509
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    --- End diff --
    
    Is there a test case covering  this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55651031
  
    --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
    @@ -110,8 +110,7 @@ protected void starting(org.junit.runner.Description description)
           fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
     
           Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
    -      attributes.put(Context.DAGContext.APPLICATION_PATH,
    -          "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    +      attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
    --- End diff --
    
    There are no checkstyle violations in new code, still the violation count was more than limit, so fixed some violations in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by sandeepdeshmukh <gi...@git.apache.org>.
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55511842
  
    --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
    @@ -215,7 +213,7 @@ public void testTimeScan() throws InterruptedException, IOException, TimeoutExce
         testMeta.blockMetadataSink.clear();
     
         Thread.sleep(1000);
    -    //added a new relativeFilePath
    +    // added a new relativeFilePath
    --- End diff --
    
    Example of white space change. Please revert this and all such changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55651056
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockMetadata.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +/**
    + * HDFSBlockMetadata extends {@link BlockMetadata} to provide an option if blocks of a file should be read in-sequence
    + * or in-parallel
    + */
    +public class HDFSBlockMetadata extends BlockMetadata.FileBlockMetadata
    +{
    --- End diff --
    
    removing the class and moving property to parent class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r56096099
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    A corner case, but what about the case when input directory is as below: 
    hdfs://node18.morado.com:8020/user/isha/trial/comma,name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55776429
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,253 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
    + * Module emits, <br/>
    + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
    + * The module reads data in parallel, following parameters can be configured<br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequencialFileRead: If emit file blocks in sequence?
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort();
    +
    +  @Override
    +  public void populateDAG(DAG dag, Configuration conf)
    +  {
    +    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
    +    BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader());
    +
    +    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
    +
    +    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
    +    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
    +    messages.set(blockReader.messages);
    +
    +    fileSplitter.setSequencialFileRead(sequencialFileRead);
    +    if (blockSize != 0) {
    +      fileSplitter.setBlockSize(blockSize);
    +    }
    +
    +    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
    +    fileScanner.setFiles(files);
    +    if (scanIntervalMillis != 0) {
    +      fileScanner.setScanIntervalMillis(scanIntervalMillis);
    +    }
    +    fileScanner.setRecursive(recursive);
    +    if (filePatternRegularExp != null) {
    +      fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
    +    }
    +
    +    blockReader.setUri(files);
    +    if (readersCount != 0) {
    +      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
    +    }
    +
    +    MetricsAggregator blockReaderMetrics = new MetricsAggregator();
    +    blockReaderMetrics.addAggregators("bytesReadPerSec", new SingleMetricAggregator[] {new LongSumAggregator()});
    +    dag.setAttribute(blockReader, Context.OperatorContext.METRICS_AGGREGATOR, blockReaderMetrics);
    +    dag.setAttribute(blockReader, Context.OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
    --- End diff --
    
    Can you use METRICS_AGGREGATOR here? COUNTERS_AGGREGATOR is marked deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55651968
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    --- End diff --
    
    Adding unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55781008
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +/**
    + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
    + */
    +public class BlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri of form hdfs://hostname:port/path/to/input
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param uri
    +   * @return String with scheme part as lower case
    +   */
    +  private String convertSchemeToLowerCase(String uri)
    +  {
    +    if (uri == null) {
    +      return null;
    +    }
    +    StringBuilder uriList = new StringBuilder();
    +    for (String f : Splitter.on(",").omitEmptyStrings().split(uri)) {
    --- End diff --
    
    Is it same to assume the name of directory in a url cannot contain ',' ? Can list<uri> be a parameter instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55800146
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    --- End diff --
    
    Yes, definitely, I was just concerned if that would be accepted in generic class or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55638951
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    --- End diff --
    
    Javadoc for class is missing. Please add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55800213
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    +    {
    +      super();
    +    }
    +
    +    public HDFSFileMetaData(@NotNull String filePath)
    +    {
    +      super(filePath);
    +    }
    +
    +    public String getRelativePath()
    +    {
    +      return relativePath;
    +    }
    +
    +    public void setRelativePath(String relativePath)
    +    {
    +      this.relativePath = relativePath;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "HDFSFileMetaData [relativePath=" + relativePath + ", getNumberOfBlocks()=" + getNumberOfBlocks() + ", getFileName()=" + getFileName() + ", getFileLength()=" + getFileLength() + ", isDirectory()=" + isDirectory() + "]";
    --- End diff --
    
    By editor. In previous PR I had got a comment that better to use auto generated code for such methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55650890
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files from HDFS. <br/>
    + * Module emits FileMetadata, BlockMetadata and the block bytes.
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    --- End diff --
    
    Default will be set by operator based on HDFS block size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55777663
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    --- End diff --
    
    Can the relativePath property be added to super class? It seems generic enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2008: Create HDFS F...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55639045
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/block/HDFSBlockReader.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.block;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +
    +import com.google.common.base.Splitter;
    +
    +public class HDFSBlockReader extends FSSliceReader
    +{
    +  protected String uri;
    +
    +  @Override
    +  protected FileSystem getFSInstance() throws IOException
    +  {
    +    return FileSystem.newInstance(URI.create(uri), configuration);
    +  }
    +
    +  /**
    +   * Sets the uri
    +   *
    +   * @param uri
    +   */
    +  public void setUri(String uri)
    +  {
    +    this.uri = convertSchemeToLowerCase(uri);
    +  }
    +
    +  public String getUri()
    +  {
    +    return uri;
    +  }
    +
    +  /**
    +   * Converts Scheme part of the URI to lower case. Multiple URI can be comma separated. If no scheme is there, no
    +   * change is made.
    +   * 
    +   * @param
    +   * @return String with scheme part as lower case
    +   */
    +  private static String convertSchemeToLowerCase(String uri)
    --- End diff --
    
    What is the reason for this method to be static? If this is a utility type method can this be moved to somewhere else more accessible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---