You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/10/05 18:39:44 UTC

[2/2] samza git commit: SAMZA-967: HDFS System Consumer

SAMZA-967: HDFS System Consumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2216fe0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2216fe0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2216fe0b

Branch: refs/heads/master
Commit: 2216fe0b7b250e2edfc14d150e59f84884e8cba4
Parents: f4d924f
Author: Hai Lu <lh...@gmail.com>
Authored: Wed Oct 5 11:39:17 2016 -0700
Committer: Xinyu Liu <xi...@apache.org>
Committed: Wed Oct 5 11:39:17 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |  16 +-
 .../samza/system/hdfs/HdfsSystemAdmin.java      | 221 ++++++++++++++
 .../samza/system/hdfs/HdfsSystemConsumer.java   | 281 +++++++++++++++++
 .../system/hdfs/PartitionDescriptorUtil.java    |  97 ++++++
 .../hdfs/partitioner/DirectoryPartitioner.java  | 235 ++++++++++++++
 .../hdfs/partitioner/FileSystemAdapter.java     |  60 ++++
 .../hdfs/partitioner/HdfsFileSystemAdapter.java |  55 ++++
 .../system/hdfs/reader/AvroFileHdfsReader.java  | 216 +++++++++++++
 .../system/hdfs/reader/HdfsReaderFactory.java   |  59 ++++
 .../system/hdfs/reader/MultiFileHdfsReader.java | 204 +++++++++++++
 .../hdfs/reader/SingleFileHdfsReader.java       |  62 ++++
 .../apache/samza/system/hdfs/HdfsConfig.scala   |  79 ++++-
 .../samza/system/hdfs/HdfsSystemAdmin.scala     |  52 ----
 .../samza/system/hdfs/HdfsSystemFactory.scala   |  46 +--
 .../system/hdfs/TestHdfsSystemConsumer.java     | 136 +++++++++
 .../hdfs/TestPartitionDesctiptorUtil.java       |  92 ++++++
 .../partitioner/TestDirectoryPartitioner.java   | 304 +++++++++++++++++++
 .../partitioner/TestHdfsFileSystemAdapter.java  |  59 ++++
 .../hdfs/reader/TestAvroFileHdfsReader.java     | 169 +++++++++++
 .../hdfs/reader/TestMultiFileHdfsReader.java    | 182 +++++++++++
 .../src/test/resources/integTest/emptyTestFile  |  16 +
 .../src/test/resources/partitioner/testfile01   |  16 +
 .../src/test/resources/partitioner/testfile02   |  16 +
 .../src/test/resources/reader/TestEvent.avsc    |  33 ++
 .../hdfs/TestHdfsSystemProducerTestSuite.scala  |  38 +--
 .../samza/job/yarn/YarnContainerRunner.java     |   2 +-
 .../apache/samza/job/yarn/YarnJobFactory.scala  |   9 +-
 27 files changed, 2654 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2bea27b..98839f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,8 @@ rat {
     'RELEASE.md',
     'samza-test/src/main/resources/**',
     'samza-hdfs/src/main/resources/**',
-    'samza-hdfs/src/test/resources/**'
+    'samza-hdfs/src/test/resources/**',
+    'out/**'
   ]
 }
 
@@ -333,6 +334,10 @@ project(":samza-yarn_$scalaVersion") {
       // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
       exclude module: 'zookeeper'
     }
+    compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+    }
     compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
       exclude module: 'scala-compiler'
       exclude module: 'slf4j-api'
@@ -494,10 +499,19 @@ project(":samza-kv-rocksdb_$scalaVersion") {
 project(":samza-hdfs_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+  sourceSets.main.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile project(":samza-kafka_$scalaVersion")
+    // currently hdfs system producer/consumer do depend on yarn for two things:
+    // 1. staging directory 2. security
+    // SAMZA-1032 to solve the staging directory dependency
+    compile project(":samza-yarn_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
       exclude module: 'slf4j-log4j12'

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
new file mode 100644
index 0000000..8bf31c5
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.partitioner.DirectoryPartitioner;
+import org.apache.samza.system.hdfs.partitioner.HdfsFileSystemAdapter;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The HDFS system admin for {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} and
+ * {@link org.apache.samza.system.hdfs.HdfsSystemProducer}
+ *
+ * A high level overview of the HDFS producer/consumer architecture:
+ *                   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *                   \u2502                                                                              \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524                                     HDFS                                     \u2502
+ * \u2502   Obtain        \u2502                                                                              \u2502
+ * \u2502  Partition      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 Descriptors            \u2502                      \u2502      \u2502                                 \u2502
+ * \u2502                        \u2502                      \u2502      \u2502                                 \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510              \u2502      \u2502       Filtering/                \u2502
+ * \u2502          \u2502                     \u2502              \u2502      \u2514\u2500\u2500\u2500\u2510    Grouping                 \u2514\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502          \u2502 HDFSAvroFileReader  \u2502              \u2502          \u2502                                   \u2502
+ * \u2502          \u2502                     \u2502    Persist   \u2502          \u2502                                   \u2502
+ * \u2502          \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518   Partition  \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502              Descriptors \u2502   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510         \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502                    \u2502                          \u2502   \u2502                     \u2502         \u2502                     \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510              \u2502   \u2502Directory Partitioner\u2502         \u2502   HDFSAvroWriter    \u2502
+ * \u2502          \u2502     IFileReader     \u2502              \u2502   \u2502                     \u2502         \u2502                     \u2502
+ * \u2502          \u2502                     \u2502              \u2502   \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518         \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502          \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518              \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502                          \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502                          \u2502          \u2502                                   \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510               \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502          \u2502                     \u2502            \u2502                     \u2502               \u2502                     \u2502
+ * \u2502          \u2502 HDFSSystemConsumer  \u2502            \u2502   HDFSSystemAdmin   \u2502               \u2502 HDFSSystemProducer  \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6                     \u2502            \u2502                     \u2502               \u2502                     \u2502
+ *            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518               \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                      \u2502                                    \u2502                                    \u2502
+ *                      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u253c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                                                           \u2502
+ *                   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *                   \u2502                                                                              \u2502
+ *                   \u2502                              HDFSSystemFactory                               \u2502
+ *                   \u2502                                                                              \u2502
+ *                   \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ */
+public class HdfsSystemAdmin implements SystemAdmin {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
+
+  private HdfsConfig hdfsConfig;
+  private DirectoryPartitioner directoryPartitioner;
+  private String stagingDirectory; // directory that contains the partition description
+  private HdfsReaderFactory.ReaderType readerType;
+
+  public HdfsSystemAdmin(String systemName, Config config) {
+    hdfsConfig = new HdfsConfig(config);
+    directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
+      hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName),
+      new HdfsFileSystemAdapter());
+    stagingDirectory = hdfsConfig.getStagingDirectory();
+    readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    /*
+     * To actually get the "after" offset we have to seek to that specific offset in the file and
+     * read that record to figure out the location of next record. This is much more expensive operation
+     * compared to the case in KafkaSystemAdmin.
+     * So simply return the same offsets. This will always incur re-processing but such semantics are legit
+     * in Samza.
+     */
+    return offsets;
+  }
+
+  static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+    Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = path.getFileSystem(new Configuration())) {
+      if (!fs.exists(path)) {
+        return null;
+      }
+      try (FSDataInputStream fis = fs.open(path)) {
+        String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
+        return PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+      }
+    } catch (IOException e) {
+      throw new SamzaException("Failed to read partition description from: " + path);
+    }
+  }
+
+  /*
+   * Persist the partition descriptor only when it doesn't exist already on HDFS.
+   */
+  private void persistPartitionDescriptor(String streamName,
+    Map<Partition, List<String>> partitionDescriptorMap) {
+    Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+      // Partition descriptor is supposed to be immutable. So don't override it if it exists.
+      if (fs.exists(targetPath)) {
+        LOG.warn(targetPath.toString() + " exists. Skip persisting partition descriptor.");
+      } else {
+        LOG.info("About to persist partition descriptors to path: " + targetPath.toString());
+        try (FSDataOutputStream fos = fs.create(targetPath)) {
+          fos.write(
+            PartitionDescriptorUtil.getJsonFromDescriptorMap(partitionDescriptorMap).getBytes(StandardCharsets.UTF_8));
+        }
+      }
+    } catch (IOException e) {
+      throw new SamzaException("Failed to validate/persist partition description on hdfs.", e);
+    }
+  }
+
+  private boolean partitionDescriptorExists(String streamName) {
+    Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+      return fs.exists(targetPath);
+    } catch (IOException e) {
+      throw new SamzaException("Failed to obtain information about path: " + targetPath);
+    }
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
+    streamNames.forEach(streamName -> {
+      systemStreamMetadataMap.put(streamName, new SystemStreamMetadata(streamName, directoryPartitioner
+        .getPartitionMetadataMap(streamName, obtainPartitionDescriptorMap(stagingDirectory, streamName))));
+      if (!partitionDescriptorExists(streamName)) {
+        persistPartitionDescriptor(streamName, directoryPartitioner.getPartitionDescriptor(streamName));
+      }
+    });
+    return systemStreamMetadataMap;
+  }
+
+  @Override
+  public void createChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+  }
+
+  @Override
+  public void validateChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+  }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
+  }
+
+  /**
+   * Compare two multi-file style offset. A multi-file style offset consist of both
+   * the file index as well as the offset within that file. And the format of it is:
+   * "fileIndex:offsetWithinFile"
+   * For example, "2:0", "3:127"
+   * Format of the offset within file is defined by the implementation of
+   * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+   *
+   * @param offset1 First offset for comparison.
+   * @param offset2 Second offset for comparison.
+   * @return -1, if offset1 @lt offset2
+   *          0, if offset1 == offset2
+   *          1, if offset1 @gt offset2
+   *          null, if not comparable
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
+      return null;
+    }
+    int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
+    int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
+    if (fileIndex1 == fileIndex2) {
+      String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
+      String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
+      return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
+    }
+    return Integer.compare(fileIndex1, fileIndex2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
new file mode 100644
index 0000000..13a7102
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -0,0 +1,281 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+
+/**
+ * The system consumer for HDFS, extending the {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * Events will be parsed from HDFS and placed into a blocking queue in {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * There will be one {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} for each {@link org.apache.samza.system.SystemStreamPartition},
+ * each {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} is running within its own thread.
+ *
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502
+ *    \u2502    MultiFileHdfsReader_1 - Thread1    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP1-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518      \u2502
+ *                                                                                      \u2502
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510      \u2502
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2502    MultiFileHdfsReader_2 - Thread2    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP2-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524        \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502                          \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502  SystemConsumer.poll()   \u2502
+ *                                                                                      \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502                          \u2502
+ *                                                                                      \u2502        \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                       ...                                         ...                \u2502
+ *                                                                                      \u2502
+ *                                                                                      \u2502
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510      \u2502
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2502    MultiFileHdfsReader_N - ThreadN    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSPN-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *    \u2502                                       \u2502            \u2502                     \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * Since each thread has only one reader and has its own blocking queue, there are essentially no communication
+ * among reader threads.
+ * Thread safety between reader threads and Samza main thread is guaranteed by the blocking queues stand in the middle.
+ */
+public class HdfsSystemConsumer extends BlockingEnvelopeMap {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
+
+  private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
+
+  private final HdfsReaderFactory.ReaderType readerType;
+  private final String stagingDirectory; // directory that contains the partition description
+  private final int bufferCapacity;
+  private final int numMaxRetires;
+  private ExecutorService executorService;
+
+  /**
+   * The cached map collection from stream partition to partition descriptor. The partition descriptor
+   * is the actual file path (or the set of file paths if the partition contains multiple files)
+   * of the stream partition.
+   * For example,
+   * (stream1) -> (P0) -> "hdfs://user/samzauser/1/datafile01.avro"
+   * (stream1) -> (P1) -> "hdfs://user/samzauser/1/datafile02.avro"
+   * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
+   * ...
+   */
+  private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+  private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+  private Map<SystemStreamPartition, Future> readerRunnableStatus;
+
+  /**
+   * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
+   * to be shutdown. {@link org.apache.samza.system.hdfs.HdfsSystemConsumer.ReaderRunnable} on
+   * each thread will be checking this variable to determine whether it should stop.
+   */
+  private volatile boolean isShutdown;
+
+  private final HdfsSystemConsumerMetrics consumerMetrics;
+  private final HdfsConfig hdfsConfig;
+
+  public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
+    super(consumerMetrics.getMetricsRegistry());
+    hdfsConfig = new HdfsConfig(config);
+    readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+    stagingDirectory = hdfsConfig.getStagingDirectory();
+    bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
+    numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
+    readers = new ConcurrentHashMap<>();
+    readerRunnableStatus = new ConcurrentHashMap<>();
+    isShutdown = false;
+    this.consumerMetrics = consumerMetrics;
+    cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
+        @Override
+        public Map<Partition, List<String>> load(String streamName)
+          throws Exception {
+          Validate.notEmpty(streamName);
+          return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
+        }
+      });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void start() {
+    LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
+    executorService = Executors.newCachedThreadPool();
+    readers.entrySet().forEach(
+      entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void stop() {
+    LOG.info("Received request to stop HdfsSystemConsumer.");
+    isShutdown = true;
+    executorService.shutdown();
+    LOG.info("HdfsSystemConsumer stopped.");
+  }
+
+  private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
+    String streamName = systemStreamPartition.getStream();
+    Partition partition = systemStreamPartition.getPartition();
+    try {
+      return cachedPartitionDescriptorMap.get(streamName).get(partition);
+    } catch (ExecutionException e) {
+      throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, e);
+    }
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(bufferCapacity);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + offset);
+    super.register(systemStreamPartition, offset);
+    MultiFileHdfsReader reader =
+      new MultiFileHdfsReader(readerType, systemStreamPartition, getPartitionDescriptor(systemStreamPartition), offset,
+        numMaxRetires);
+    readers.put(systemStreamPartition, reader);
+    consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+    Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+    throws InterruptedException {
+    systemStreamPartitions.forEach(systemStreamPartition -> {
+      Future status = readerRunnableStatus.get(systemStreamPartition);
+      if (status.isDone()) {
+        try {
+          status.get();
+        } catch (ExecutionException | InterruptedException e) {
+          MultiFileHdfsReader reader = readers.get(systemStreamPartition);
+          LOG.warn(
+            String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
+            e);
+          reader.reconnect();
+          readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
+        }
+      }
+    });
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+    try {
+      super.put(systemStreamPartition, envelope);
+    } catch (InterruptedException e) {
+      throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
+    }
+  }
+
+  private void doPoll(MultiFileHdfsReader reader) {
+    SystemStreamPartition systemStreamPartition = reader.getSystemStreamPartition();
+    while (reader.hasNext() && !isShutdown) {
+      IncomingMessageEnvelope messageEnvelope = reader.readNext();
+      offerMessage(systemStreamPartition, messageEnvelope);
+      consumerMetrics.incNumEvents(systemStreamPartition);
+      consumerMetrics.incTotalNumEvents();
+    }
+    offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
+    reader.close();
+  }
+
+  public static class HdfsSystemConsumerMetrics {
+
+    private final MetricsRegistry metricsRegistry;
+    private final Map<SystemStreamPartition, Counter> numEventsCounterMap;
+    private final Counter numTotalEventsCounter;
+
+    public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
+      this.metricsRegistry = metricsRegistry;
+      this.numEventsCounterMap = new ConcurrentHashMap<>();
+      this.numTotalEventsCounter = metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-total-events");
+    }
+
+    public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
+      numEventsCounterMap.putIfAbsent(systemStreamPartition,
+        metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
+    }
+
+    public void incNumEvents(SystemStreamPartition systemStreamPartition) {
+      if (!numEventsCounterMap.containsKey(systemStreamPartition)) {
+        registerSystemStreamPartition(systemStreamPartition);
+      }
+      numEventsCounterMap.get(systemStreamPartition).inc();
+    }
+
+    public void incTotalNumEvents() {
+      numTotalEventsCounter.inc();
+    }
+
+    public MetricsRegistry getMetricsRegistry() {
+      return metricsRegistry;
+    }
+  }
+
+  private class ReaderRunnable implements Runnable {
+    public MultiFileHdfsReader reader;
+
+    public ReaderRunnable(MultiFileHdfsReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+      doPoll(reader);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
new file mode 100644
index 0000000..5abdbbc
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+/**
+ * Util class for methods around partition descriptor.
+ *
+ * Partition descriptor is rich information about a partition: the set
+ * of files�that are associated with the�partition.
+ *
+ * Partition descriptor map, or descriptor map, is the map between the
+ * {@link org.apache.samza.Partition} and the descriptor
+ */
+public class PartitionDescriptorUtil {
+
+  private PartitionDescriptorUtil() {
+
+  }
+
+  private static final int INDENT_FACTOR = 2;
+  private static final String DELIMITER = ",";
+
+  private static String getStringFromPaths(List<String> paths) {
+    return String.join(DELIMITER, paths);
+  }
+
+  private static List<String> getPathsFromString(String descriptor) {
+    return Arrays.asList(descriptor.split(DELIMITER));
+  }
+
+  public static String getJsonFromDescriptorMap(Map<Partition, List<String>> descriptorMap) {
+    JSONObject out = new JSONObject();
+    descriptorMap.forEach((partition, paths) -> {
+      String descriptorStr = getStringFromPaths(paths);
+      try {
+        out.put(String.valueOf(partition.getPartitionId()), descriptorStr);
+      } catch (JSONException e) {
+        throw new SamzaException(
+          String.format("Invalid description to encode. partition=%s, descriptor=%s", partition, descriptorStr), e);
+      }
+    });
+    try {
+      return out.toString(INDENT_FACTOR);
+    } catch (JSONException e) {
+      throw new SamzaException("Failed to generate json string.", e);
+    }
+  }
+
+  public static Map<Partition, List<String>> getDescriptorMapFromJson(String json) {
+    try {
+      @SuppressWarnings("unchecked")
+      Map<String, String> rawMap = new ObjectMapper().readValue(json, HashMap.class);
+      Map<Partition, List<String>> descriptorMap = new HashMap<>();
+      rawMap.forEach((key, value) -> descriptorMap.put(new Partition(Integer.valueOf(key)), getPathsFromString(value)));
+      return descriptorMap;
+    } catch (IOException | NumberFormatException e) {
+      throw new SamzaException("Failed to convert json: " + json, e);
+    }
+  }
+
+  public static Path getPartitionDescriptorPath(String base, String streamName) {
+    Path basePath = new Path(base);
+    Path relativePath = new Path(streamName.replaceAll("\\W", "_") + "_partition_description");
+    return new Path(basePath, relativePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
new file mode 100644
index 0000000..5cad1e4
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -0,0 +1,235 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+/**
+ * The partitioner that takes a directory as an input and does
+ * 1. Filtering, based on a white list and a black list
+ * 2. Grouping, based on grouping pattern
+ *
+ * And then generate the partition metadata and partition descriptors
+ *
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+public class DirectoryPartitioner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
+  private static final String GROUP_IDENTIFIER = "\\[id]";
+
+  private String whiteListRegex;
+  private String blackListRegex;
+  private String groupPattern;
+  private FileSystemAdapter fileSystemAdapter;
+
+  // stream name => partition => partition descriptor
+  private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+
+  public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
+    FileSystemAdapter fileSystemAdapter) {
+    this.whiteListRegex = whiteList;
+    this.blackListRegex = blackList;
+    this.groupPattern = groupPattern;
+    this.fileSystemAdapter = fileSystemAdapter;
+    LOG.info(String
+      .format("Creating DirectoryPartitioner with whiteList=%s, blackList=%s, groupPattern=%s", whiteList, blackList,
+        groupPattern));
+  }
+
+  /*
+   * Based on the stream name, get the list of all files and filter out unneeded ones given
+   * the white list and black list
+   */
+  private List<FileMetadata> getFilteredFiles(String streamName) {
+    List<FileMetadata> filteredFiles = new ArrayList<>();
+    List<FileMetadata> allFiles = fileSystemAdapter.getAllFiles(streamName);
+    LOG.info(String.format("List of all files for %s: %s", streamName, allFiles));
+    allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
+      .forEach(filteredFiles::add);
+    // sort the files to have a consistent order
+    filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+    LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
+    return filteredFiles;
+  }
+
+  /*
+   * Algorithm to extract the group identifier from the path based on
+   * the group pattern.
+   * 1. Split the group pattern into two parts (prefix, suffix)
+   * 2. Match the both prefix and suffix against the input
+   * 3. Strip the prefix and suffix and then we can get the group identifier
+   *
+   * For example,
+   * input = run_2016-08-01-part-3.avro
+   * group pattern = ".*part-[id]/.avro"
+   *
+   * 1. Split: prefix pattern = ".*part-" suffix pattern = "/.avro"
+   * 2. Match: prefix string = "run_2016-08-01-part-" suffix string = ".avro"
+   * 3. Extract: output = "3"
+   *
+   * If we can't extract a group identifier, return the original input
+   */
+  private String extractGroupIdentifier(String input) {
+    if (StringUtils.isBlank(GROUP_IDENTIFIER)) {
+      return input;
+    }
+    String[] patterns = groupPattern.split(GROUP_IDENTIFIER);
+    if (patterns.length != 2) {
+      return input;
+    }
+
+    Pattern p1 = Pattern.compile(patterns[0]);
+    Pattern p2 = Pattern.compile(patterns[1]);
+    Matcher m1 = p1.matcher(input);
+    Matcher m2 = p2.matcher(input);
+    if (!m1.find()) {
+      return input;
+    }
+    int s1 = m1.end();
+    if (!m2.find(s1)) {
+      return input;
+    }
+    int s2 = m2.start();
+    return input.substring(s1, s2);
+  }
+
+  /*
+   * Group partitions based on the group identifier extracted from the file path
+   */
+  private List<List<FileMetadata>> generatePartitionGroups(List<FileMetadata> filteredFiles) {
+    Map<String, List<FileMetadata>> map = new HashMap<>();
+    for (FileMetadata fileMetadata : filteredFiles) {
+      String groupId = extractGroupIdentifier(fileMetadata.getPath());
+      map.putIfAbsent(groupId, new ArrayList<>());
+      map.get(groupId).add(fileMetadata);
+    }
+    List<List<FileMetadata>> ret = new ArrayList<>();
+    // sort the map to guarantee consistent ordering
+    List<String> sortedKeys = new ArrayList<>(map.keySet());
+    sortedKeys.sort(Comparator.<String>naturalOrder());
+    sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
+    return ret;
+  }
+
+   /*
+    * This class holds the assumption that the directory remains immutable.
+    * If the directory does changes:
+    * ignore new files showing up in the directory based on an old version of partition descriptor;
+    * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+    */
+  private List<FileMetadata> validateAndGetOriginalFilteredFiles(List<FileMetadata> newFileList,
+    Map<Partition, List<String>> existingPartitionDescriptor) {
+    assert newFileList != null;
+    assert existingPartitionDescriptor != null;
+    Set<String> oldFileSet = new HashSet<>();
+    existingPartitionDescriptor.values().forEach(oldFileSet::addAll);
+    Set<String> newFileSet = new HashSet<>();
+    newFileList.forEach(file -> newFileSet.add(file.getPath()));
+    if (!newFileSet.containsAll(oldFileSet)) {
+      throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+        + oldFileSet.removeAll(newFileSet));
+    }
+    Iterator<FileMetadata> iterator = newFileList.iterator();
+    while (iterator.hasNext()) {
+      FileMetadata file = iterator.next();
+      if (!oldFileSet.contains(file.getPath())) {
+        iterator.remove();
+      }
+    }
+    return newFileList;
+  }
+
+  /**
+   * Get partition metadata for a stream
+   * @param streamName name of the stream; should contain the information about the path of the
+   *                   root directory
+   * @param existingPartitionDescriptorMap map of the existing partition descriptor
+   * @return map of SSP metadata
+   */
+  public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
+    @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
+    LOG.info("Trying to obtain metadata for " + streamName);
+    LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+      : existingPartitionDescriptorMap));
+    Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
+    partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
+    List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
+    if (existingPartitionDescriptorMap != null) {
+      filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
+    }
+    List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
+    int partitionId = 0;
+    for (List<FileMetadata> fileGroup : groupedPartitions) {
+      Partition partition = new Partition(partitionId);
+      List<String> pathList = new ArrayList<>();
+      List<String> lengthList = new ArrayList<>();
+      fileGroup.forEach(fileMetadata -> {
+        pathList.add(fileMetadata.getPath());
+        lengthList.add(String.valueOf(fileMetadata.getLen()));
+      });
+      String oldestOffset = MultiFileHdfsReader.generateOffset(0, "0");
+      String newestOffset = MultiFileHdfsReader.generateOffset(lengthList.size() - 1, String.valueOf(lengthList.get(lengthList.size() - 1)));
+      SystemStreamPartitionMetadata metadata =
+        new SystemStreamPartitionMetadata(oldestOffset, newestOffset, null);
+      partitionMetadataMap.put(partition, metadata);
+      partitionDescriptorMap.get(streamName).put(partition, pathList);
+      partitionId++;
+    }
+    LOG.info("Obtained metadata map as: " + partitionMetadataMap);
+    LOG.info("Computed partition description as: " + partitionDescriptorMap);
+    return partitionMetadataMap;
+  }
+
+  /**
+   * Get partition descriptors for a stream
+   * @param streamName name of the stream; should contain the information about the path of the
+   *                   root directory
+   * @return map of the partition descriptor
+   */
+  public Map<Partition, List<String>> getPartitionDescriptor(String streamName) {
+    return partitionDescriptorMap.get(streamName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
new file mode 100644
index 0000000..5fec4bf
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.List;
+
+
+/**
+ * An adapter between directory partitioner and the actual file systems or
+ * file system like systems.
+ */
+public interface FileSystemAdapter {
+
+  /**
+   * Return the list of all files given the stream name
+   * @param streamName name of the stream
+   * @return list of <code>FileMetadata</code> for all files associated to the given stream
+   */
+  public List<FileMetadata> getAllFiles(String streamName);
+
+  public class FileMetadata {
+    private String path;
+    private long length;
+
+    public FileMetadata(String path, long length) {
+      this.path = path;
+      this.length = length;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public long getLen() {
+      return length;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("[path = %s, length = %s]", path, length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
new file mode 100644
index 0000000..bb7b3fa
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HdfsFileSystemAdapter implements FileSystemAdapter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsFileSystemAdapter.class);
+
+  public List<FileMetadata> getAllFiles(String streamName) {
+    List<FileMetadata> ret = new ArrayList<>();
+    try {
+      Path streamPath = new Path(streamName);
+      FileSystem fileSystem = streamPath.getFileSystem(new Configuration());
+      FileStatus[] fileStatuses = fileSystem.listStatus(streamPath);
+      for (FileStatus fileStatus : fileStatuses) {
+        ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get the list of files for " + streamName, e);
+      throw new SamzaException(e);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
new file mode 100644
index 0000000..757b40b
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
@@ -0,0 +1,216 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.AvroFSInput;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the HdfsReader that reads and processes avro format
+ * files.
+ */
+public class AvroFileHdfsReader implements SingleFileHdfsReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroFileHdfsReader.class);
+
+  private final SystemStreamPartition systemStreamPartition;
+  private DataFileReader<GenericRecord> fileReader;
+  private long curBlockStart;
+  private long curRecordOffset;
+
+  public AvroFileHdfsReader(SystemStreamPartition systemStreamPartition) {
+    this.systemStreamPartition = systemStreamPartition;
+    this.fileReader = null;
+  }
+
+  @Override
+  public void open(String pathStr, String singleFileOffset) {
+    LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset));
+    Path path = new Path(pathStr);
+    try {
+      AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path);
+      fileReader = new DataFileReader<>(input, new GenericDatumReader<>());
+      seek(singleFileOffset);
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public void seek(String singleFileOffset) {
+    try {
+      // See comments for AvroFileCheckpoint to understand the behavior below
+      AvroFileCheckpoint checkpoint = new AvroFileCheckpoint(singleFileOffset);
+      if (checkpoint.isStartingOffset()) {
+        // seek to the beginning of the first block
+        fileReader.sync(0);
+        curBlockStart = fileReader.previousSync();
+        curRecordOffset = 0;
+        return;
+      }
+      fileReader.seek(checkpoint.getBlockStart());
+      for (int i = 0; i < checkpoint.getRecordOffset(); i++) {
+        if (fileReader.hasNext()) {
+          fileReader.next();
+        }
+      }
+      curBlockStart = checkpoint.getBlockStart();
+      curRecordOffset = checkpoint.getRecordOffset();
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public IncomingMessageEnvelope readNext() {
+    // get checkpoint for THIS record
+    String checkpoint = nextOffset();
+    GenericRecord record = fileReader.next();
+    if (fileReader.previousSync() != curBlockStart) {
+      curBlockStart = fileReader.previousSync();
+      curRecordOffset = 0;
+    } else {
+      curRecordOffset++;
+    }
+    // avro schema doesn't necessarily have key field
+    return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return fileReader.hasNext();
+  }
+
+  @Override
+  public void close() {
+    LOG.info("About to close file reader for " + systemStreamPartition);
+    try {
+      fileReader.close();
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+    LOG.info("File reader closed for " + systemStreamPartition);
+  }
+
+  @Override
+  public String nextOffset() {
+    return AvroFileCheckpoint.generateCheckpointStr(curBlockStart, curRecordOffset);
+  }
+
+  public static int offsetComparator(String offset1, String offset2) {
+    AvroFileCheckpoint cp1 = new AvroFileCheckpoint(offset1);
+    AvroFileCheckpoint cp2 = new AvroFileCheckpoint(offset2);
+    return cp1.compareTo(cp2);
+  }
+
+  /**
+   * An avro file looks something like this:
+   *
+   * Byte offset: 0       103            271         391
+   *              \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+   * Avro file:   \u2502 Header \u2502    Block 1   \u2502  Block 2  \u2502  Block 3  \u2502 ...
+   *              \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+   *
+   * Each block contains multiple records. The start of a block is defined as a valid
+   * synchronization point. A file reader can only seek to a synchronization point, i.e.
+   * the start of blocks. Thus, to precisely describe the location of a record, we need
+   * to use the pair (blockStart, recordOffset). Here "blockStart" means the start of the
+   * block and "recordOffset" means the index of the record within the block.
+   * Take the example above, and suppose block 1 has 4 records, we have record sequences as:
+   * (103, 0), (103, 1), (103, 2), (103, 3), (271, 0), ...
+   * where (271, 0) represents the first event in block 2
+   *
+   * With the CP_DELIM being '@', the actual checkpoint string would look like "103@1",
+   * "271@0" or "271", etc. For convenience, a checkpoint with only the blockStart but no
+   * recordOffset within the block simply means the first record in that block. Thus,
+   * "271@0" is equal to "271".
+   */
+  public static class AvroFileCheckpoint {
+    private static final String CP_DELIM = "@";
+    private long blockStart; // start position of the block
+    private long recordOffset; // record offset within the block
+    String checkpointStr;
+
+    public static String generateCheckpointStr(long blockStart, long recordOffset) {
+      return blockStart + CP_DELIM + recordOffset;
+    }
+
+    public AvroFileCheckpoint(String checkpointStr) {
+      String[] elements = checkpointStr.replaceAll("\\s", "").split(CP_DELIM);
+      if (elements.length > 2 || elements.length < 1) {
+        throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr);
+      }
+      try {
+        blockStart = Long.parseLong(elements[0]);
+        recordOffset = elements.length == 2 ? Long.parseLong(elements[1]) : 0;
+      } catch (NumberFormatException e) {
+        throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr, e);
+      }
+      this.checkpointStr = checkpointStr;
+    }
+
+    public AvroFileCheckpoint(long blockStart, long recordOffset) {
+      this.blockStart = blockStart;
+      this.recordOffset = recordOffset;
+      this.checkpointStr = generateCheckpointStr(blockStart, recordOffset);
+    }
+
+    public long getBlockStart() {
+      return blockStart;
+    }
+
+    public long getRecordOffset() {
+      return recordOffset;
+    }
+
+    public String getCheckpointStr() {
+      return checkpointStr;
+    }
+
+    public boolean isStartingOffset() {
+      return blockStart == 0;
+    }
+
+    public int compareTo(AvroFileCheckpoint other) {
+      if (this.blockStart < other.blockStart) {
+        return -1;
+      } else if (this.blockStart > other.blockStart) {
+        return 1;
+      } else return Long.compare(this.recordOffset, other.recordOffset);
+    }
+
+    @Override
+    public String toString() {
+      return getCheckpointStr();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
new file mode 100644
index 0000000..4efdfd7
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class HdfsReaderFactory {
+  public static SingleFileHdfsReader getHdfsReader(ReaderType readerType, SystemStreamPartition systemStreamPartition) {
+    switch (readerType) {
+      case AVRO: return new AvroFileHdfsReader(systemStreamPartition);
+      default:
+        throw new SamzaException("Unsupported reader type: " + readerType);
+    }
+  }
+
+  public static ReaderType getType(String readerTypeStr) {
+    try {
+      return ReaderType.valueOf(readerTypeStr.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new SamzaException("Invalid hdfs reader type string: " + readerTypeStr, e);
+    }
+  }
+
+  public static int offsetComparator(ReaderType readerType, String offset1, String offset2) {
+    switch (readerType) {
+      case AVRO: return AvroFileHdfsReader.offsetComparator(offset1, offset2);
+      default:
+        throw new SamzaException("Unsupported reader type: " + readerType);
+    }
+  }
+
+  /*
+   * Support AVRO only so far. Implement <code>SingleFileHdfsReader</code> to support a variety of
+   * file parsers. Can easily support "plain" text in the future (each line of the
+   * text representing a record for example)
+   */
+  public enum ReaderType {
+    AVRO
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
new file mode 100644
index 0000000..7870713
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A wrapper on top of {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader}
+ * to manage the situation of multiple files per partition.
+ *
+ * The offset for MultiFileHdfsReader, which is also the offset that gets
+ * committed in and used by Samza, consists of two parts: file index,
+ * actual offset within file. For example, 3:127
+ *
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ */
+public class MultiFileHdfsReader {
+  private static final Logger LOG = LoggerFactory.getLogger(MultiFileHdfsReader.class);
+  private static final String DELIMITER = ":";
+
+  private final HdfsReaderFactory.ReaderType readerType;
+  private final SystemStreamPartition systemStreamPartition;
+  private List<String> filePaths;
+  private SingleFileHdfsReader curReader;
+  private int curFileIndex = 0;
+  private String curSingleFileOffset;
+  private int numRetries;
+  private int numMaxRetries;
+
+  /**
+   * Get the current file index from the offset string
+   * @param offset offset string that contains both file index and offset within file
+   * @return the file index part
+   */
+  public static int getCurFileIndex(String offset) {
+    String[] elements = offset.split(DELIMITER);
+    if (elements.length < 2) {
+      throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+    }
+    return Integer.parseInt(elements[0]);
+  }
+
+  /**
+   * Get the offset within file from the offset string
+   * @param offset offset string that contains both file index and offset within file
+   * @return the single file offset part
+   */
+  public static String getCurSingleFileOffset(String offset) {
+    String[] elements = offset.split(DELIMITER);
+    if (elements.length < 2) {
+      throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+    }
+    // Getting the remaining of the offset string in case the single file
+    // offset uses the same delimiter.
+    return offset.substring(elements[0].length() + 1);
+  }
+
+  /**
+   * Generate the offset based on file index and offset within single file
+   * @param fileIndex index of the file
+   * @param singleFileOffset offset within single file
+   * @return the complete offset
+   */
+  public static String generateOffset(int fileIndex, String singleFileOffset) {
+    return fileIndex + DELIMITER + singleFileOffset;
+  }
+
+  /*
+   * Get current offset: offset of the LAST message being successfully read. If no messages have
+   * ever been read, return the offset of first event.
+   */
+  private String getCurOffset() {
+    return generateOffset(curFileIndex, curSingleFileOffset);
+  }
+
+  public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+    List<String> partitionDescriptors, String offset) {
+    this(readerType, systemStreamPartition, partitionDescriptors, offset,
+      Integer.parseInt(HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT()));
+  }
+
+  private void init(String offset) {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+    curFileIndex = getCurFileIndex(offset);
+    if (curFileIndex >= filePaths.size()) {
+      throw new SamzaException(
+        String.format("Invalid file index %d. Number of files is %d", curFileIndex, filePaths.size()));
+    }
+    curSingleFileOffset = getCurSingleFileOffset(offset);
+    curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+    curReader.open(filePaths.get(curFileIndex), curSingleFileOffset);
+  }
+
+  public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+    List<String> partitionDescriptors, String offset, int numMaxRetries) {
+    this.readerType = readerType;
+    this.systemStreamPartition = systemStreamPartition;
+    this.filePaths = partitionDescriptors;
+    this.numMaxRetries = numMaxRetries;
+    this.numRetries = 0;
+    if (partitionDescriptors.size() <= 0) {
+      throw new SamzaException(
+        "Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
+    }
+    init(offset);
+  }
+
+  public boolean hasNext() {
+    while (curFileIndex < filePaths.size()) {
+      if (curReader.hasNext()) {
+        return true;
+      }
+      curReader.close();
+      curFileIndex++;
+      if (curFileIndex < filePaths.size()) {
+        curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+        curReader.open(filePaths.get(curFileIndex), "0");
+      }
+    }
+    return false;
+  }
+
+  public IncomingMessageEnvelope readNext() {
+    if (!hasNext()) {
+      LOG.warn("Attempting to read more data when there aren't any. ssp=" + systemStreamPartition);
+      return null;
+    }
+    // record the next offset before we read, so when the read fails and we reconnect,
+    // we seek to the same offset that we try below
+    curSingleFileOffset = curReader.nextOffset();
+    IncomingMessageEnvelope messageEnvelope = curReader.readNext();
+    // Copy everything except for the offset. Turn the single-file style offset into a multi-file one
+    return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
+      messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+  }
+
+  /**
+   * Reconnect to the file systems in case of failure.
+   * Reset offset to the last checkpoint (last successfully read message).
+   * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+   * retries.
+   */
+  public void reconnect() {
+    reconnect(getCurOffset());
+  }
+
+  /**
+   * Reconnect to the file systems in case of failures.
+   * @param offset reset offset to the specified offset
+   * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+   * retries.
+   */
+  public void reconnect(String offset) {
+    if (numRetries >= numMaxRetries) {
+      throw new SamzaException(
+        String.format("Give up reconnecting. numRetries: %d; numMaxRetries: %d", numRetries, numMaxRetries));
+    }
+    LOG.info(String
+      .format("Reconnecting with offset: %s numRetries: %d numMaxRetries: %d", offset, numRetries, numMaxRetries));
+    numRetries++;
+    init(offset);
+  }
+
+  public void close() {
+    LOG.info(String.format("MiltiFileHdfsReader shutdown requested for %s. Current offset = %s", systemStreamPartition,
+      getCurOffset()));
+    if (curReader != null) {
+      curReader.close();
+    }
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return systemStreamPartition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
new file mode 100644
index 0000000..eb8a70d
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+public interface SingleFileHdfsReader {
+  /**
+   * Open the file and seek to specific offset for reading.
+   * @param path path of the file to be read
+   * @param offset offset the reader should start from
+   */
+  public void open(String path, String offset);
+
+  /**
+   * Seek to a specific offset
+   * @param offset offset the reader should seek to
+   */
+  public void seek(String offset);
+
+  /**
+   * Construct and return the next message envelope
+   * @return constructed IncomeMessageEnvelope
+   */
+  public IncomingMessageEnvelope readNext();
+
+  /**
+   * Get the next offset, which is the offset for the next message
+   * that will be returned by readNext
+   * @return next offset
+   */
+  public String nextOffset();
+
+  /**
+   * Whether there are still records to be read
+   * @return true of false based on whether the reader has hit end of file
+   */
+  public boolean hasNext();
+
+  /**
+   * Close the reader.
+   */
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 61b7570..53ff372 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -25,7 +25,7 @@ import java.util.UUID
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.{Config, ScalaMapConfig}
+import org.apache.samza.config.{YarnConfig, Config, ScalaMapConfig}
 import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConversions._
@@ -62,6 +62,34 @@ object HdfsConfig {
   val BUCKETER_CLASS = "systems.%s.producer.hdfs.bucketer.class"
   val BUCKETER_CLASS_DEFAULT = "org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer"
 
+  // capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+  val CONSUMER_BUFFER_CAPACITY = "systems.%s.consumer.bufferCapacity"
+  val CONSUMER_BUFFER_CAPACITY_DEFAULT = 10.toString
+
+  // number of max retries for the hdfs consumer readers per partition
+  val CONSUMER_NUM_MAX_RETRIES = "system.%s.consumer.numMaxRetries"
+  val CONSUMER_NUM_MAX_RETRIES_DEFAULT = 10.toString
+
+  // white list used by directory partitioner to filter out unwanted files in a hdfs directory
+  val CONSUMER_PARTITIONER_WHITELIST = "systems.%s.partitioner.defaultPartitioner.whitelist"
+  val CONSUMER_PARTITIONER_WHITELIST_DEFAULT = ".*"
+
+  // black list used by directory partitioner to filter out unwanted files in a hdfs directory
+  val CONSUMER_PARTITIONER_BLACKLIST = "systems.%s.partitioner.defaultPartitioner.blacklist"
+  val CONSUMER_PARTITIONER_BLACKLIST_DEFAULT = ""
+
+  // group pattern used by directory partitioner for advanced partitioning
+  val CONSUMER_PARTITIONER_GROUP_PATTERN = "systems.%s.partitioner.defaultPartitioner.groupPattern"
+  val CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT = ""
+
+  // type of the file reader (avro, plain, etc.)
+  val FILE_READER_TYPE = "systems.%s.consumer.reader"
+  val FILE_READER_TYPE_DEFAULT = "avro"
+
+  // staging directory for storing partition description
+  val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
+  val STAGING_DIRECTORY_DEFAULT = ""
+
   implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
 
 }
@@ -130,4 +158,53 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
     getOrElse(HdfsConfig.COMPRESSION_TYPE format systemName, HdfsConfig.COMPRESSION_TYPE_DEFAULT)
   }
 
+  /**
+   * Get the capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+   */
+  def getConsumerBufferCapacity(systemName: String): Int = {
+    getOrElse(HdfsConfig.CONSUMER_BUFFER_CAPACITY format systemName, HdfsConfig.CONSUMER_BUFFER_CAPACITY_DEFAULT).toInt
+  }
+
+  /**
+    * Get number of max retries for the hdfs consumer readers per partition
+    */
+  def getConsumerNumMaxRetries(systemName: String): Int = {
+    getOrElse(HdfsConfig.CONSUMER_NUM_MAX_RETRIES format systemName, HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT).toInt
+  }
+
+  /**
+   * White list used by directory partitioner to filter out unwanted files in a hdfs directory
+   */
+  def getPartitionerWhiteList(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_WHITELIST_DEFAULT)
+  }
+
+  /**
+   * Black list used by directory partitioner to filter out unwanted files in a hdfs directory
+   */
+  def getPartitionerBlackList(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST_DEFAULT)
+  }
+
+  /**
+   * Group pattern used by directory partitioner for advanced partitioning
+   */
+  def getPartitionerGroupPattern(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN format systemName, HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT)
+  }
+
+  /**
+   * Get the type of the file reader (avro, plain, etc.)
+   */
+  def getFileReaderType(systemName: String): String = {
+    getOrElse(HdfsConfig.FILE_READER_TYPE format systemName, HdfsConfig.FILE_READER_TYPE_DEFAULT)
+  }
+
+  /**
+   * Staging directory for storing partition description. If not set, will use the staging directory set
+   * by yarn job.
+   */
+  def getStagingDirectory(): String = {
+    getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
deleted file mode 100644
index 92eb447..0000000
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.hdfs
-
-
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{SystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-
-
-class HdfsSystemAdmin extends SystemAdmin with Logging {
-
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
-    new java.util.HashMap[SystemStreamPartition, String]()
-  }
-
-  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    new java.util.HashMap[String, SystemStreamMetadata]()
-  }
-
-  def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def createCoordinatorStream(streamName: String) {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def offsetComparator(offset1: String, offset2: String) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index ef3c20a..3673431 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -1,45 +1,45 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
 
 package org.apache.samza.system.hdfs
 
 
-import org.apache.samza.SamzaException
-
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.{KafkaUtil,Logging}
+import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
+import org.apache.samza.util.{KafkaUtil, Logging}
 
 
 class HdfsSystemFactory extends SystemFactory with Logging {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    throw new SamzaException("HdfsSystemFactory does not implement a consumer")
+    new HdfsSystemConsumer(systemName, config, new HdfsSystemConsumerMetrics(registry))
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    // TODO: SAMZA-1026: should remove Kafka dependency below
     val clientId = KafkaUtil.getClientId("samza-producer", config)
     val metrics = new HdfsSystemProducerMetrics(systemName, registry)
     new HdfsSystemProducer(systemName, clientId, config, metrics)
   }
 
   def getAdmin(systemName: String, config: Config) = {
-    new HdfsSystemAdmin
+    new HdfsSystemAdmin(systemName, config)
   }
 }