You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/10/05 18:39:43 UTC

[1/2] samza git commit: SAMZA-967: HDFS System Consumer

Repository: samza
Updated Branches:
  refs/heads/master f4d924fd2 -> 2216fe0b7


http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
new file mode 100644
index 0000000..ef5ab00
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.TestAvroFileHdfsReader;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+
+public class TestHdfsSystemConsumer {
+
+  private static final String SYSTEM_NAME = "hdfs";
+  private static final String FIELD_1 = "field1";
+  private static final String FIELD_2 = "field2";
+  private static final String WORKING_DIRECTORY = TestHdfsSystemConsumer.class.getResource("/integTest").getPath();
+  private static final String AVRO_FILE_1 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-01.avro";
+  private static final String AVRO_FILE_2 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-02.avro";
+  private static final String AVRO_FILE_3 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-03.avro";
+  private static final int NUM_FILES = 3;
+  private static final int NUM_EVENTS = 100;
+
+  private Config generateDefaultConfig() throws IOException {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*TestHdfsSystemConsumer.*avro");
+    Path stagingDirectory = Files.createTempDirectory("staging");
+    stagingDirectory.toFile().deleteOnExit();
+    properties.put(HdfsConfig.STAGING_DIRECTORY(), stagingDirectory.toString());
+    return new MapConfig(properties);
+  }
+
+  private void generateAvroDataFiles() throws Exception {
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_1, NUM_EVENTS);
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_2, NUM_EVENTS);
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_3, NUM_EVENTS);
+  }
+
+  /*
+   * A simple end to end test that covers the workflow from system admin to
+   * partitioner, system consumer, and so on, making sure the basic functionality
+   * works as expected.
+   */
+  @Test
+  public void testHdfsSystemConsumerE2E() throws Exception {
+    Config config = generateDefaultConfig();
+    HdfsSystemFactory systemFactory = new HdfsSystemFactory();
+
+    // create admin and do partitioning
+    HdfsSystemAdmin systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config);
+    String streamName = WORKING_DIRECTORY;
+    Set<String> streamNames = new HashSet<>();
+    streamNames.add(streamName);
+    generateAvroDataFiles();
+    Map<String, SystemStreamMetadata> streamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+    SystemStreamMetadata systemStreamMetadata = streamMetadataMap.get(streamName);
+    Assert.assertEquals(NUM_FILES, systemStreamMetadata.getSystemStreamPartitionMetadata().size());
+
+    // create consumer and read from files
+    HdfsSystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, new NoOpMetricsRegistry());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = systemStreamMetadata.getSystemStreamPartitionMetadata();
+    Set<SystemStreamPartition> systemStreamPartitionSet = new HashSet<>();
+    metadataMap.forEach((partition, metadata) -> {
+      SystemStreamPartition ssp = new SystemStreamPartition(SYSTEM_NAME, streamName, partition);
+      systemStreamPartitionSet.add(ssp);
+      String offset = metadata.getOldestOffset();
+      systemConsumer.register(ssp, offset);
+    });
+    systemConsumer.start();
+
+    // verify events read from consumer
+    int eventsReceived = 0;
+    int totalEvents = (NUM_EVENTS + 1) * NUM_FILES; // one "End of Stream" event in the end
+    int remainingRetires = 100;
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> overallResults = new HashMap<>();
+    while (eventsReceived < totalEvents && remainingRetires > 0) {
+      remainingRetires--;
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 200);
+      for(SystemStreamPartition ssp : result.keySet()) {
+        List<IncomingMessageEnvelope> messageEnvelopeList = result.get(ssp);
+        overallResults.putIfAbsent(ssp, new ArrayList<>());
+        overallResults.get(ssp).addAll(messageEnvelopeList);
+        if (overallResults.get(ssp).size() >= NUM_EVENTS + 1) {
+          systemStreamPartitionSet.remove(ssp);
+        }
+        eventsReceived += messageEnvelopeList.size();
+      }
+    }
+    Assert.assertEquals(eventsReceived, totalEvents);
+    Assert.assertEquals(NUM_FILES, overallResults.size());
+    overallResults.values().forEach(messages -> {
+      Assert.assertEquals(NUM_EVENTS + 1, messages.size());
+      for (int index = 0;index < NUM_EVENTS; index++) {
+        GenericRecord record = (GenericRecord) messages.get(index).getMessage();
+        Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+        Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+      }
+      Assert.assertEquals(messages.get(NUM_EVENTS).getOffset(), IncomingMessageEnvelope.END_OF_STREAM_OFFSET);
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
new file mode 100644
index 0000000..c8ddbe0
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestPartitionDesctiptorUtil {
+
+  @Test
+  public void testBasicEncodeDecode() {
+    Map<Partition, List<String>> input = new HashMap<>();
+    input.put(new Partition(0), Collections.singletonList("path_0"));
+    String[] array = {"path_1", "path_2"};
+    input.put(new Partition(1), Arrays.asList(array));
+    input.put(new Partition(3), Collections.singletonList("path_3"));
+    String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+    Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+    Assert.assertEquals(3, output.entrySet().size());
+    Assert.assertTrue(output.containsKey(new Partition(0)));
+    Assert.assertEquals("path_0", output.get(new Partition(0)).get(0));
+    Assert.assertTrue(output.containsKey(new Partition(1)));
+    Assert.assertEquals("path_1", output.get(new Partition(1)).get(0));
+    Assert.assertEquals("path_2", output.get(new Partition(1)).get(1));
+    Assert.assertTrue(output.containsKey(new Partition(3)));
+    Assert.assertEquals("path_3", output.get(new Partition(3)).get(0));
+  }
+
+  @Test
+  public void testSingleEntry() {
+    Map<Partition, List<String>> input = new HashMap<>();
+    input.put(new Partition(1), Collections.singletonList("random_path_1"));
+    String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+    Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+    Assert.assertEquals(1, output.entrySet().size());
+    Assert.assertTrue(output.containsKey(new Partition(1)));
+    Assert.assertEquals("random_path_1", output.get(new Partition(1)).get(0));
+  }
+
+  @Test
+  public void testKeyOverriding() {
+    Map<Partition, List<String>> input = new HashMap<>();
+    input.put(new Partition(0), Collections.singletonList("path_0"));
+    input.put(new Partition(0), Collections.singletonList("new_path_0"));
+    String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+    Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+    Assert.assertEquals(1, output.entrySet().size());
+    Assert.assertTrue(output.containsKey(new Partition(0)));
+    Assert.assertEquals("new_path_0", output.get(new Partition(0)).get(0));
+  }
+
+  @Test
+  public void testEmptyInput() {
+    Map<Partition, List<String>> input = new HashMap<>();
+    String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+    Assert.assertNotNull(json);
+    Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+    Assert.assertTrue(output.isEmpty());
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInvalidInput() {
+    String json = "invalidStr";
+    PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
new file mode 100644
index 0000000..aea32ff
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+public class TestDirectoryPartitioner {
+
+  class TestFileSystemAdapter implements FileSystemAdapter {
+    private List<FileMetadata> expectedList;
+
+    public TestFileSystemAdapter(List<FileMetadata> expectedList) {
+      this.expectedList = expectedList;
+    }
+
+    public List<FileMetadata> getAllFiles(String streamName) {
+      return expectedList;
+    }
+  }
+
+  private void verifyPartitionDescriptor(String[] inputFiles, int[][] expectedPartitioning, int expectedNumPartition,
+    Map<Partition, List<String>> actualPartitioning) {
+    Assert.assertEquals(expectedNumPartition, actualPartitioning.size());
+    Set<String> actualPartitioningPath = new HashSet<>();
+    actualPartitioning.values().forEach(list -> actualPartitioningPath.add(String.join(",", list)));
+    for (int i = 0; i < expectedNumPartition; i++) {
+      int[] indexes = expectedPartitioning[i];
+      List<String> files = new ArrayList<>();
+      for (int j : indexes) {
+        files.add(inputFiles[j]);
+      }
+      files.sort(Comparator.<String>naturalOrder());
+      String expectedCombinedPath = String.join(",", files);
+      Assert.assertTrue(actualPartitioningPath.contains(expectedCombinedPath));
+    }
+  }
+
+  @Test
+  public void testBasicWhiteListFiltering() {
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 9;
+    String[] inputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "delta-01.avro",
+      "part-005.avro",
+      "delta-03.avro",
+      "part-004.avro",
+      "delta-02.avro",
+      "part-006.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+    String whiteList = "part-.*\\.avro";
+    String blackList = "";
+    String groupPattern = "";
+    int EXPECTED_NUM_PARTITION = 6;
+    int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {4}, {6}, {8}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriptorMap);
+  }
+
+  @Test
+  public void testBasicBlackListFiltering() {
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 9;
+    String[] inputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "delta-01.avro",
+      "part-005.avro",
+      "delta-03.avro",
+      "part-004.avro",
+      "delta-02.avro",
+      "part-006.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+    String whiteList = ".*";
+    String blackList = "delta-.*\\.avro";
+    String groupPattern = "";
+    int EXPECTED_NUM_PARTITION = 6;
+    int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {4}, {6}, {8}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+  }
+
+  @Test
+  public void testWhiteListBlackListFiltering() {
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 9;
+    String[] inputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "delta-01.avro",
+      "part-005.avro",
+      "delta-03.avro",
+      "part-004.avro",
+      "delta-02.avro",
+      "part-006.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+    String whiteList = "part-.*\\.avro";
+    String blackList = "part-002.avro";
+    String groupPattern = "";
+    int EXPECTED_NUM_PARTITION = 5;
+    int[][] EXPECTED_PARTITIONING = {{0}, {2}, {4}, {6}, {8}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+  }
+
+  @Test
+  public void testBasicGrouping() {
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 9;
+    String[] inputFiles = {
+      "00_10-run_2016-08-15-13-04-part.0.150582.avro",
+      "00_10-run_2016-08-15-13-04-part.1.138132.avro",
+      "00_10-run_2016-08-15-13-04-part.2.214005.avro",
+      "00_10-run_2016-08-15-13-05-part.0.205738.avro",
+      "00_10-run_2016-08-15-13-05-part.1.158273.avro",
+      "00_10-run_2016-08-15-13-05-part.2.982345.avro",
+      "00_10-run_2016-08-15-13-06-part.0.313245.avro",
+      "00_10-run_2016-08-15-13-06-part.1.234212.avro",
+      "00_10-run_2016-08-15-13-06-part.2.413232.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+
+    String whiteList = ".*\\.avro";
+    String blackList = "";
+    String groupPattern = ".*part\\.[id]\\..*\\.avro"; // 00_10-run_2016-08-15-13-04-part.[id].138132.avro
+    int EXPECTED_NUM_PARTITION = 3;
+    int[][] EXPECTED_PARTITIONING = {
+      {0, 3, 6}, // files from index 0, 3, 6 should be grouped into one partition
+      {1, 4, 7}, // similar as above
+      {2, 5, 8}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+  }
+
+  @Test
+  public void testValidDirectoryUpdating() {
+    // the update is valid when there are only new files being added to the directory
+    // no changes on the old files
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 6;
+    String[] inputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "part-005.avro",
+      "part-004.avro",
+      "part-006.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+    String whiteList = ".*";
+    String blackList = "";
+    String groupPattern = "";
+    int EXPECTED_NUM_PARTITION = 6;
+    int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {3}, {4}, {5}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+
+    NUM_INPUT = 7;
+    String[] updatedInputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "part-005.avro",
+      "part-004.avro",
+      "part-007.avro", // add a new file to the directory
+      "part-006.avro"};
+    long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 2513454, 982345};
+    testList.clear();
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
+    }
+    directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size()); // still expect only 6 partitions instead of 7
+    Map<Partition, List<String>> updatedDescriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, updatedDescriptorMap);
+  }
+
+  @Test
+  public void testInvalidDirectoryUpdating() {
+    // the update is invalid when at least one old file is removed
+    List<FileMetadata> testList = new ArrayList<>();
+    int NUM_INPUT = 6;
+    String[] inputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "part-005.avro",
+      "part-004.avro",
+      "part-006.avro"};
+    long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+    }
+    String whiteList = ".*";
+    String blackList = "";
+    String groupPattern = "";
+    int EXPECTED_NUM_PARTITION = 6;
+    int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {3}, {4}, {5}};
+
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+    Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+    verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+
+    String[] updatedInputFiles = {
+      "part-001.avro",
+      "part-002.avro",
+      "part-003.avro",
+      "part-005.avro",
+      "part-007.avro", // remove part-004 and replace it with 007
+      "part-006.avro"};
+    long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+    testList.clear();
+    for (int i = 0; i < NUM_INPUT; i++) {
+      testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
+    }
+    directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+    try {
+      directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
+      Assert.fail("Expect exception thrown from getting metadata. Should not reach this point.");
+    } catch (SamzaException e) {
+      // expect exception to be thrown
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
new file mode 100644
index 0000000..0fb461f
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestHdfsFileSystemAdapter {
+
+  @Test
+  public void testGetAllFiles()
+    throws Exception {
+    URL url = this.getClass().getResource("/partitioner");
+    FileSystemAdapter adapter = new HdfsFileSystemAdapter();
+    List<FileSystemAdapter.FileMetadata> result =
+      adapter.getAllFiles(url.getPath());
+    Assert.assertEquals(2, result.size());
+  }
+
+  @Test
+  public void testIntegrationWithPartitioner() throws Exception {
+    URL url = this.getClass().getResource("/partitioner");
+    String whiteList = ".*";
+    String blackList = ".*02";
+    String groupPattern = "";
+    String streamName = String.format(url.getPath());
+    DirectoryPartitioner directoryPartitioner =
+      new DirectoryPartitioner(whiteList, blackList, groupPattern, new HdfsFileSystemAdapter());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap(streamName, null);
+    Assert.assertEquals(1, metadataMap.size());
+    Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor(streamName);
+    Assert.assertEquals(1, descriporMap.values().size());
+    Assert.assertTrue(descriporMap.get(new Partition(0)).get(0).endsWith("testfile01"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
new file mode 100644
index 0000000..aa828d9
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
@@ -0,0 +1,169 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+
+import java.io.File;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestAvroFileHdfsReader {
+
+  private static final String FIELD_1 = "field1";
+  private static final String FIELD_2 = "field2";
+  private static final String WORKING_DIRECTORY = TestAvroFileHdfsReader.class.getResource("/reader").getPath();
+  private static final String AVRO_FILE = WORKING_DIRECTORY + "/TestAvroFileHdfsReader-01.avro";
+  private static final int NUM_EVENTS = 500;
+
+  public static void writeTestEventsToFile(String path, int numEvents)
+    throws Exception {
+    Schema schema = Schema.parse(TestAvroFileHdfsReader.class.getResourceAsStream("/reader/TestEvent.avsc"));
+    File file = new File(path);
+    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer);
+    dataFileWriter.create(schema, file);
+    for (int i = 0; i < numEvents; i++) {
+      GenericRecord datum = new GenericData.Record(schema);
+      datum.put(FIELD_1, i);
+      datum.put(FIELD_2, "string_" + i);
+      dataFileWriter.append(datum);
+    }
+    dataFileWriter.close();
+  }
+
+  @BeforeClass
+  public static void writeAvroEvents() throws Exception {
+    writeTestEventsToFile(AVRO_FILE, NUM_EVENTS);
+  }
+
+  @Test
+  public void testSequentialRead() throws Exception {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+    reader.open(AVRO_FILE, "0");
+    int index = 0;
+    while (reader.hasNext()) {
+      GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+      Assert.assertEquals(index, record.get(FIELD_1));
+      Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+      index++;
+    }
+    Assert.assertEquals(NUM_EVENTS, index);
+    reader.close();
+  }
+
+  @Test
+  public void testFileReopen() throws Exception {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+    reader.open(AVRO_FILE, "0");
+    int index = 0;
+    for (;index < NUM_EVENTS / 2; index++) {
+      GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+      Assert.assertEquals(index, record.get(FIELD_1));
+      Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+    }
+    String offset = reader.nextOffset();
+    reader.close();
+    reader = new AvroFileHdfsReader(ssp);
+    reader.open(AVRO_FILE, offset);
+    for (;index < NUM_EVENTS; index++) {
+      GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+      Assert.assertEquals(index, record.get(FIELD_1));
+      Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+    }
+    Assert.assertEquals(NUM_EVENTS, index);
+    reader.close();
+  }
+
+  @Test
+  public void testRandomRead() throws Exception {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+    reader.open(AVRO_FILE, "0");
+    for (int i = 0;i < NUM_EVENTS / 2; i++) {
+      reader.readNext();
+    }
+    String offset = reader.nextOffset();
+    IncomingMessageEnvelope envelope = reader.readNext();
+    Assert.assertEquals(offset, envelope.getOffset());
+
+    GenericRecord record1 = (GenericRecord) envelope.getMessage();
+
+    for (int i = 0; i < 5; i++) reader.readNext();
+
+    // seek to the offset within the same reader
+    reader.seek(offset);
+    Assert.assertEquals(offset, reader.nextOffset());
+    envelope = reader.readNext();
+    Assert.assertEquals(offset, envelope.getOffset());
+    GenericRecord record2 = (GenericRecord) envelope.getMessage();
+    Assert.assertEquals(record1, record2);
+    reader.close();
+
+    // open a new reader and initialize it with the offset
+    reader = new AvroFileHdfsReader(ssp);
+    reader.open(AVRO_FILE, offset);
+    envelope = reader.readNext();
+    Assert.assertEquals(offset, envelope.getOffset());
+    GenericRecord record3 = (GenericRecord) envelope.getMessage();
+    Assert.assertEquals(record1, record3);
+    reader.close();
+  }
+
+  @Test
+  public void testOffsetComparator() {
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("0", "1452"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2001@4"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@4", "2010@1"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2011@3"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001", "2001@4"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001", "2010@1"));
+    Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2010"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("1984", "0"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("1984@2", "1984@1"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@2", "1984@2"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@1", "1984@10"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341", "1984@10"));
+    Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@1", "1984"));
+    Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989", "1989"));
+    Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989@0", "1989"));
+    Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989", "1989@0"));
+    Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("0", "0"));
+    Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989@1", "1989@1"));
+  }
+
+  @Test(expected = Exception.class)
+  public void testOffsetComparator_InvalidInput() {
+    AvroFileHdfsReader.offsetComparator("1982,13", "1930,1");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
new file mode 100644
index 0000000..5682a7c
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
@@ -0,0 +1,182 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMultiFileHdfsReader {
+  private static final String FIELD_1 = "field1";
+  private static final String FIELD_2 = "field2";
+  private static final String WORKING_DIRECTORY = TestMultiFileHdfsReader.class.getResource("/reader").getPath();
+  private static final String AVRO_FILE_1 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-01.avro";
+  private static final String AVRO_FILE_2 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-02.avro";
+  private static final String AVRO_FILE_3 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-03.avro";
+  private static String[] descriptors = {AVRO_FILE_1, AVRO_FILE_2, AVRO_FILE_3};
+  private static final int NUM_EVENTS = 100;
+
+  @BeforeClass
+  public static void writeAvroEvents()
+    throws Exception {
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_1, NUM_EVENTS);
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_2, NUM_EVENTS);
+    TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_3, NUM_EVENTS);
+  }
+
+  @Test
+  public void testSequentialRead()
+    throws Exception {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0");
+    int index = 0;
+    while (multiReader.hasNext()) {
+      GenericRecord record = (GenericRecord) multiReader.readNext().getMessage();
+      Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+      Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+      index++;
+    }
+    Assert.assertEquals(3 * NUM_EVENTS, index);
+    multiReader.close();
+  }
+
+  @Test
+  public void testReaderReopen()
+    throws Exception {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+
+    // read until the middle of the first file
+    MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), "0:0");
+    int index = 0;
+    String offset = "0:0";
+    for (; index < NUM_EVENTS / 2; index++) {
+      IncomingMessageEnvelope envelope = multiReader.readNext();
+      GenericRecord record = (GenericRecord) envelope.getMessage();
+      Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+      Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+      offset = envelope.getOffset();
+    }
+    multiReader.close();
+
+    // read until the middle of the second file
+    multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), offset);
+    multiReader.readNext(); // skip one duplicate event
+    for (; index < NUM_EVENTS + NUM_EVENTS / 2; index++) {
+      IncomingMessageEnvelope envelope = multiReader.readNext();
+      GenericRecord record = (GenericRecord) envelope.getMessage();
+      Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+      Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+      offset = envelope.getOffset();
+    }
+    multiReader.close();
+
+    // read the rest of all files
+    multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), offset);
+    multiReader.readNext(); // skip one duplicate event
+    while (multiReader.hasNext()) {
+      IncomingMessageEnvelope envelope = multiReader.readNext();
+      GenericRecord record = (GenericRecord) envelope.getMessage();
+      Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+      Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+      index++;
+      offset = envelope.getOffset();
+    }
+    Assert.assertEquals(3 * NUM_EVENTS, index);
+    multiReader.close();
+
+    // reopen with the offset of the last record
+    multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), offset);
+    multiReader.readNext(); // skip one duplicate event
+    Assert.assertFalse(multiReader.hasNext());
+    multiReader.close();
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testOutOfRangeSingleFileOffset() {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), "0:1000000&0");
+    Assert.fail();
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testOutOfRangeFileIndex() {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      Arrays.asList(descriptors), "3:0");
+    Assert.fail();
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInvalidPartitionDescriptor() {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+      new ArrayList<>(), "0:0");
+    Assert.fail();
+  }
+
+  @Test
+  public void testReconnect() {
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0");
+    // first read a few events, and then reconnect
+    for (int i = 0; i < NUM_EVENTS / 2; i++) {
+      multiReader.readNext();
+    }
+    IncomingMessageEnvelope envelope = multiReader.readNext();
+    multiReader.reconnect();
+    IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
+    Assert.assertEquals(envelope, envelopeAfterReconnect);
+    multiReader.close();
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testReachingMaxReconnect() {
+    int numMaxRetries = 3;
+    SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+    MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0", numMaxRetries);
+    // first read a few events, and then reconnect
+    for (int i = 0; i < NUM_EVENTS / 2; i++) {
+      multiReader.readNext();
+    }
+    for (int i = 0; i < numMaxRetries; i++) {
+      IncomingMessageEnvelope envelope = multiReader.readNext();
+      multiReader.reconnect();
+      IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
+      Assert.assertEquals(envelope, envelopeAfterReconnect);
+    }
+    multiReader.readNext();
+    multiReader.reconnect();
+    Assert.fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/integTest/emptyTestFile
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/integTest/emptyTestFile b/samza-hdfs/src/test/resources/integTest/emptyTestFile
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/integTest/emptyTestFile
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/partitioner/testfile01
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/partitioner/testfile01 b/samza-hdfs/src/test/resources/partitioner/testfile01
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/partitioner/testfile01
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/partitioner/testfile02
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/partitioner/testfile02 b/samza-hdfs/src/test/resources/partitioner/testfile02
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/partitioner/testfile02
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/reader/TestEvent.avsc b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
new file mode 100644
index 0000000..289a589
--- /dev/null
+++ b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+  "type" : "record",
+  "name" : "TestEvent",
+  "namespace" : "HDFSConsumer",
+  "fields" : [
+    {
+      "name" : "field1",
+      "type" : "int"
+    },
+    {
+      "name" : "field2",
+      "type" : "string"
+    }]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
index 261310d..db7036a 100644
--- a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
+++ b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
@@ -20,31 +20,25 @@
 package org.apache.samza.system.hdfs
 
 
-import java.io.{InputStreamReader, File, IOException}
+import java.io.File
 import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import org.apache.avro.file.{SeekableFileInput, CodecFactory, DataFileWriter, DataFileReader}
-import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter, ReflectData}
-import org.apache.avro.specific.SpecificDatumReader
+import org.apache.avro.file.DataFileReader
+import org.apache.avro.reflect.{ReflectData, ReflectDatumReader}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
-import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster}
-import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text}
+import org.apache.hadoop.hdfs.{DFSConfigKeys, MiniDFSCluster}
 import org.apache.hadoop.io.SequenceFile.Reader
-
+import org.apache.hadoop.io.{BytesWritable, LongWritable, SequenceFile, Text}
 import org.apache.samza.config.Config
-import org.apache.samza.system.hdfs.HdfsConfig._
 import org.apache.samza.config.factories.PropertiesConfigFactory
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.hdfs.HdfsConfig._
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
 import org.apache.samza.util.Logging
-
-import org.junit.{AfterClass, BeforeClass, Test}
 import org.junit.Assert._
-
-import scala.collection.JavaConverters._
+import org.junit.{AfterClass, Test}
 
 
 object TestHdfsSystemProducerTestSuite {
@@ -88,7 +82,7 @@ object TestHdfsSystemProducerTestSuite {
   }
 
   def buildProducer(name: String, cluster: MiniDFSCluster): Option[HdfsSystemProducer] @unchecked = {
-   Some(
+    Some(
       hdfsFactory.getProducer(
         name,
         propsFactory.getConfig(URI.create(RESOURCE_PATH_FORMAT format (new File(".").getCanonicalPath, name))),
@@ -277,7 +271,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
       Thread.sleep(PAUSE)
 
       val systemStream = new SystemStream(AVRO_JOB_NAME, TEST)
-      val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+      val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
       producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
 
       producer.get.stop
@@ -292,7 +286,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
       val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath)
       val schema = ReflectData.get().getSchema(atc.getClass)
       val datumReader = new ReflectDatumReader[Object](schema)
-      val tfReader = DataFileReader.openReader(atf, datumReader)
+      val tfReader = new DataFileReader[Object](atf, datumReader)
       val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
 
       assertTrue(atc == atc2)
@@ -314,7 +308,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
       Thread.sleep(PAUSE)
 
       val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST)
-      val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+      val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
 
       (1 to 20).map {
         i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
@@ -332,7 +326,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
         val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath)
         val schema = ReflectData.get().getSchema(atc.getClass)
         val datumReader = new ReflectDatumReader[Object](schema)
-        val tfReader = DataFileReader.openReader(atf, datumReader)
+        val tfReader = new DataFileReader[Object](atf, datumReader)
         val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
         assertTrue(atc == atc2)
       }
@@ -347,12 +341,12 @@ class TestHdfsSystemProducerTestSuite extends Logging {
 
 class TestHdfsSystemProducer(systemName: String, config: HdfsConfig, clientId: String, metrics: HdfsSystemProducerMetrics, mini: MiniDFSCluster)
   extends HdfsSystemProducer(systemName, clientId, config, metrics) {
-    override val dfs = mini.getFileSystem
+  override val dfs = mini.getFileSystem
 }
 
 
 class TestHdfsSystemFactory extends HdfsSystemFactory {
-    def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = {
-      new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster)
-    }
+  def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = {
+    new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index dacc52d..181102b 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -179,7 +179,7 @@ public class YarnContainerRunner {
       Iterator iter = credentials.getAllTokens().iterator();
       while (iter.hasNext()) {
         TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
-        if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
           iter.remove();
         }
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index 4e328a5..625d3bb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -18,6 +18,9 @@
  */
 
 package org.apache.samza.job.yarn
+
+
+import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.samza.job.StreamJobFactory
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.config.Config
@@ -29,7 +32,11 @@ class YarnJobFactory extends StreamJobFactory {
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
     hConfig.set("fs.https.impl", classOf[HttpFileSystem].getName)
-
+    hConfig.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
+    // pass along the RM config if has any
+    if (config.containsKey(YarnConfiguration.RM_ADDRESS)) {
+      hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032"))
+    }
     new YarnJob(config, hConfig)
   }
 }


[2/2] samza git commit: SAMZA-967: HDFS System Consumer

Posted by xi...@apache.org.
SAMZA-967: HDFS System Consumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2216fe0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2216fe0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2216fe0b

Branch: refs/heads/master
Commit: 2216fe0b7b250e2edfc14d150e59f84884e8cba4
Parents: f4d924f
Author: Hai Lu <lh...@gmail.com>
Authored: Wed Oct 5 11:39:17 2016 -0700
Committer: Xinyu Liu <xi...@apache.org>
Committed: Wed Oct 5 11:39:17 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |  16 +-
 .../samza/system/hdfs/HdfsSystemAdmin.java      | 221 ++++++++++++++
 .../samza/system/hdfs/HdfsSystemConsumer.java   | 281 +++++++++++++++++
 .../system/hdfs/PartitionDescriptorUtil.java    |  97 ++++++
 .../hdfs/partitioner/DirectoryPartitioner.java  | 235 ++++++++++++++
 .../hdfs/partitioner/FileSystemAdapter.java     |  60 ++++
 .../hdfs/partitioner/HdfsFileSystemAdapter.java |  55 ++++
 .../system/hdfs/reader/AvroFileHdfsReader.java  | 216 +++++++++++++
 .../system/hdfs/reader/HdfsReaderFactory.java   |  59 ++++
 .../system/hdfs/reader/MultiFileHdfsReader.java | 204 +++++++++++++
 .../hdfs/reader/SingleFileHdfsReader.java       |  62 ++++
 .../apache/samza/system/hdfs/HdfsConfig.scala   |  79 ++++-
 .../samza/system/hdfs/HdfsSystemAdmin.scala     |  52 ----
 .../samza/system/hdfs/HdfsSystemFactory.scala   |  46 +--
 .../system/hdfs/TestHdfsSystemConsumer.java     | 136 +++++++++
 .../hdfs/TestPartitionDesctiptorUtil.java       |  92 ++++++
 .../partitioner/TestDirectoryPartitioner.java   | 304 +++++++++++++++++++
 .../partitioner/TestHdfsFileSystemAdapter.java  |  59 ++++
 .../hdfs/reader/TestAvroFileHdfsReader.java     | 169 +++++++++++
 .../hdfs/reader/TestMultiFileHdfsReader.java    | 182 +++++++++++
 .../src/test/resources/integTest/emptyTestFile  |  16 +
 .../src/test/resources/partitioner/testfile01   |  16 +
 .../src/test/resources/partitioner/testfile02   |  16 +
 .../src/test/resources/reader/TestEvent.avsc    |  33 ++
 .../hdfs/TestHdfsSystemProducerTestSuite.scala  |  38 +--
 .../samza/job/yarn/YarnContainerRunner.java     |   2 +-
 .../apache/samza/job/yarn/YarnJobFactory.scala  |   9 +-
 27 files changed, 2654 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2bea27b..98839f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,8 @@ rat {
     'RELEASE.md',
     'samza-test/src/main/resources/**',
     'samza-hdfs/src/main/resources/**',
-    'samza-hdfs/src/test/resources/**'
+    'samza-hdfs/src/test/resources/**',
+    'out/**'
   ]
 }
 
@@ -333,6 +334,10 @@ project(":samza-yarn_$scalaVersion") {
       // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
       exclude module: 'zookeeper'
     }
+    compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+    }
     compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
       exclude module: 'scala-compiler'
       exclude module: 'slf4j-api'
@@ -494,10 +499,19 @@ project(":samza-kv-rocksdb_$scalaVersion") {
 project(":samza-hdfs_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+  sourceSets.main.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile project(":samza-kafka_$scalaVersion")
+    // currently hdfs system producer/consumer do depend on yarn for two things:
+    // 1. staging directory 2. security
+    // SAMZA-1032 to solve the staging directory dependency
+    compile project(":samza-yarn_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
       exclude module: 'slf4j-log4j12'

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
new file mode 100644
index 0000000..8bf31c5
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.partitioner.DirectoryPartitioner;
+import org.apache.samza.system.hdfs.partitioner.HdfsFileSystemAdapter;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The HDFS system admin for {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} and
+ * {@link org.apache.samza.system.hdfs.HdfsSystemProducer}
+ *
+ * A high level overview of the HDFS producer/consumer architecture:
+ *                   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *                   \u2502                                                                              \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524                                     HDFS                                     \u2502
+ * \u2502   Obtain        \u2502                                                                              \u2502
+ * \u2502  Partition      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 Descriptors            \u2502                      \u2502      \u2502                                 \u2502
+ * \u2502                        \u2502                      \u2502      \u2502                                 \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510              \u2502      \u2502       Filtering/                \u2502
+ * \u2502          \u2502                     \u2502              \u2502      \u2514\u2500\u2500\u2500\u2510    Grouping                 \u2514\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502          \u2502 HDFSAvroFileReader  \u2502              \u2502          \u2502                                   \u2502
+ * \u2502          \u2502                     \u2502    Persist   \u2502          \u2502                                   \u2502
+ * \u2502          \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518   Partition  \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502              Descriptors \u2502   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510         \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502                    \u2502                          \u2502   \u2502                     \u2502         \u2502                     \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510              \u2502   \u2502Directory Partitioner\u2502         \u2502   HDFSAvroWriter    \u2502
+ * \u2502          \u2502     IFileReader     \u2502              \u2502   \u2502                     \u2502         \u2502                     \u2502
+ * \u2502          \u2502                     \u2502              \u2502   \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518         \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502          \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518              \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502                          \u2502          \u2502                                   \u2502
+ * \u2502                    \u2502                          \u2502          \u2502                                   \u2502
+ * \u2502          \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510               \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502          \u2502                     \u2502            \u2502                     \u2502               \u2502                     \u2502
+ * \u2502          \u2502 HDFSSystemConsumer  \u2502            \u2502   HDFSSystemAdmin   \u2502               \u2502 HDFSSystemProducer  \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6                     \u2502            \u2502                     \u2502               \u2502                     \u2502
+ *            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518               \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                      \u2502                                    \u2502                                    \u2502
+ *                      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u253c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                                                           \u2502
+ *                   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *                   \u2502                                                                              \u2502
+ *                   \u2502                              HDFSSystemFactory                               \u2502
+ *                   \u2502                                                                              \u2502
+ *                   \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ */
+public class HdfsSystemAdmin implements SystemAdmin {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
+
+  private HdfsConfig hdfsConfig;
+  private DirectoryPartitioner directoryPartitioner;
+  private String stagingDirectory; // directory that contains the partition description
+  private HdfsReaderFactory.ReaderType readerType;
+
+  public HdfsSystemAdmin(String systemName, Config config) {
+    hdfsConfig = new HdfsConfig(config);
+    directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
+      hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName),
+      new HdfsFileSystemAdapter());
+    stagingDirectory = hdfsConfig.getStagingDirectory();
+    readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    /*
+     * To actually get the "after" offset we have to seek to that specific offset in the file and
+     * read that record to figure out the location of next record. This is much more expensive operation
+     * compared to the case in KafkaSystemAdmin.
+     * So simply return the same offsets. This will always incur re-processing but such semantics are legit
+     * in Samza.
+     */
+    return offsets;
+  }
+
+  static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+    Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = path.getFileSystem(new Configuration())) {
+      if (!fs.exists(path)) {
+        return null;
+      }
+      try (FSDataInputStream fis = fs.open(path)) {
+        String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
+        return PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+      }
+    } catch (IOException e) {
+      throw new SamzaException("Failed to read partition description from: " + path);
+    }
+  }
+
+  /*
+   * Persist the partition descriptor only when it doesn't exist already on HDFS.
+   */
+  private void persistPartitionDescriptor(String streamName,
+    Map<Partition, List<String>> partitionDescriptorMap) {
+    Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+      // Partition descriptor is supposed to be immutable. So don't override it if it exists.
+      if (fs.exists(targetPath)) {
+        LOG.warn(targetPath.toString() + " exists. Skip persisting partition descriptor.");
+      } else {
+        LOG.info("About to persist partition descriptors to path: " + targetPath.toString());
+        try (FSDataOutputStream fos = fs.create(targetPath)) {
+          fos.write(
+            PartitionDescriptorUtil.getJsonFromDescriptorMap(partitionDescriptorMap).getBytes(StandardCharsets.UTF_8));
+        }
+      }
+    } catch (IOException e) {
+      throw new SamzaException("Failed to validate/persist partition description on hdfs.", e);
+    }
+  }
+
+  private boolean partitionDescriptorExists(String streamName) {
+    Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+    try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+      return fs.exists(targetPath);
+    } catch (IOException e) {
+      throw new SamzaException("Failed to obtain information about path: " + targetPath);
+    }
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
+    streamNames.forEach(streamName -> {
+      systemStreamMetadataMap.put(streamName, new SystemStreamMetadata(streamName, directoryPartitioner
+        .getPartitionMetadataMap(streamName, obtainPartitionDescriptorMap(stagingDirectory, streamName))));
+      if (!partitionDescriptorExists(streamName)) {
+        persistPartitionDescriptor(streamName, directoryPartitioner.getPartitionDescriptor(streamName));
+      }
+    });
+    return systemStreamMetadataMap;
+  }
+
+  @Override
+  public void createChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+  }
+
+  @Override
+  public void validateChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+  }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
+  }
+
+  /**
+   * Compare two multi-file style offset. A multi-file style offset consist of both
+   * the file index as well as the offset within that file. And the format of it is:
+   * "fileIndex:offsetWithinFile"
+   * For example, "2:0", "3:127"
+   * Format of the offset within file is defined by the implementation of
+   * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+   *
+   * @param offset1 First offset for comparison.
+   * @param offset2 Second offset for comparison.
+   * @return -1, if offset1 @lt offset2
+   *          0, if offset1 == offset2
+   *          1, if offset1 @gt offset2
+   *          null, if not comparable
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
+      return null;
+    }
+    int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
+    int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
+    if (fileIndex1 == fileIndex2) {
+      String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
+      String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
+      return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
+    }
+    return Integer.compare(fileIndex1, fileIndex2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
new file mode 100644
index 0000000..13a7102
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -0,0 +1,281 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+
+/**
+ * The system consumer for HDFS, extending the {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * Events will be parsed from HDFS and placed into a blocking queue in {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * There will be one {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} for each {@link org.apache.samza.system.SystemStreamPartition},
+ * each {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} is running within its own thread.
+ *
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502
+ *    \u2502    MultiFileHdfsReader_1 - Thread1    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP1-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518      \u2502
+ *                                                                                      \u2502
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510      \u2502
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2502    MultiFileHdfsReader_2 - Thread2    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP2-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524        \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502                          \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518      \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502  SystemConsumer.poll()   \u2502
+ *                                                                                      \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502                          \u2502
+ *                                                                                      \u2502        \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *                       ...                                         ...                \u2502
+ *                                                                                      \u2502
+ *                                                                                      \u2502
+ *    \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510            \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510      \u2502
+ *    \u2502                                       \u2502            \u2502                     \u2502      \u2502
+ *    \u2502    MultiFileHdfsReader_N - ThreadN    \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSPN-BlockingQueue  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *    \u2502                                       \u2502            \u2502                     \u2502
+ *    \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * Since each thread has only one reader and has its own blocking queue, there are essentially no communication
+ * among reader threads.
+ * Thread safety between reader threads and Samza main thread is guaranteed by the blocking queues stand in the middle.
+ */
+public class HdfsSystemConsumer extends BlockingEnvelopeMap {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
+
+  private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
+
+  private final HdfsReaderFactory.ReaderType readerType;
+  private final String stagingDirectory; // directory that contains the partition description
+  private final int bufferCapacity;
+  private final int numMaxRetires;
+  private ExecutorService executorService;
+
+  /**
+   * The cached map collection from stream partition to partition descriptor. The partition descriptor
+   * is the actual file path (or the set of file paths if the partition contains multiple files)
+   * of the stream partition.
+   * For example,
+   * (stream1) -> (P0) -> "hdfs://user/samzauser/1/datafile01.avro"
+   * (stream1) -> (P1) -> "hdfs://user/samzauser/1/datafile02.avro"
+   * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
+   * ...
+   */
+  private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+  private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+  private Map<SystemStreamPartition, Future> readerRunnableStatus;
+
+  /**
+   * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
+   * to be shutdown. {@link org.apache.samza.system.hdfs.HdfsSystemConsumer.ReaderRunnable} on
+   * each thread will be checking this variable to determine whether it should stop.
+   */
+  private volatile boolean isShutdown;
+
+  private final HdfsSystemConsumerMetrics consumerMetrics;
+  private final HdfsConfig hdfsConfig;
+
+  public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
+    super(consumerMetrics.getMetricsRegistry());
+    hdfsConfig = new HdfsConfig(config);
+    readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+    stagingDirectory = hdfsConfig.getStagingDirectory();
+    bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
+    numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
+    readers = new ConcurrentHashMap<>();
+    readerRunnableStatus = new ConcurrentHashMap<>();
+    isShutdown = false;
+    this.consumerMetrics = consumerMetrics;
+    cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
+        @Override
+        public Map<Partition, List<String>> load(String streamName)
+          throws Exception {
+          Validate.notEmpty(streamName);
+          return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
+        }
+      });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void start() {
+    LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
+    executorService = Executors.newCachedThreadPool();
+    readers.entrySet().forEach(
+      entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void stop() {
+    LOG.info("Received request to stop HdfsSystemConsumer.");
+    isShutdown = true;
+    executorService.shutdown();
+    LOG.info("HdfsSystemConsumer stopped.");
+  }
+
+  private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
+    String streamName = systemStreamPartition.getStream();
+    Partition partition = systemStreamPartition.getPartition();
+    try {
+      return cachedPartitionDescriptorMap.get(streamName).get(partition);
+    } catch (ExecutionException e) {
+      throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, e);
+    }
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(bufferCapacity);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + offset);
+    super.register(systemStreamPartition, offset);
+    MultiFileHdfsReader reader =
+      new MultiFileHdfsReader(readerType, systemStreamPartition, getPartitionDescriptor(systemStreamPartition), offset,
+        numMaxRetires);
+    readers.put(systemStreamPartition, reader);
+    consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+    Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+    throws InterruptedException {
+    systemStreamPartitions.forEach(systemStreamPartition -> {
+      Future status = readerRunnableStatus.get(systemStreamPartition);
+      if (status.isDone()) {
+        try {
+          status.get();
+        } catch (ExecutionException | InterruptedException e) {
+          MultiFileHdfsReader reader = readers.get(systemStreamPartition);
+          LOG.warn(
+            String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
+            e);
+          reader.reconnect();
+          readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
+        }
+      }
+    });
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+    try {
+      super.put(systemStreamPartition, envelope);
+    } catch (InterruptedException e) {
+      throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
+    }
+  }
+
+  private void doPoll(MultiFileHdfsReader reader) {
+    SystemStreamPartition systemStreamPartition = reader.getSystemStreamPartition();
+    while (reader.hasNext() && !isShutdown) {
+      IncomingMessageEnvelope messageEnvelope = reader.readNext();
+      offerMessage(systemStreamPartition, messageEnvelope);
+      consumerMetrics.incNumEvents(systemStreamPartition);
+      consumerMetrics.incTotalNumEvents();
+    }
+    offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
+    reader.close();
+  }
+
+  public static class HdfsSystemConsumerMetrics {
+
+    private final MetricsRegistry metricsRegistry;
+    private final Map<SystemStreamPartition, Counter> numEventsCounterMap;
+    private final Counter numTotalEventsCounter;
+
+    public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
+      this.metricsRegistry = metricsRegistry;
+      this.numEventsCounterMap = new ConcurrentHashMap<>();
+      this.numTotalEventsCounter = metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-total-events");
+    }
+
+    public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
+      numEventsCounterMap.putIfAbsent(systemStreamPartition,
+        metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
+    }
+
+    public void incNumEvents(SystemStreamPartition systemStreamPartition) {
+      if (!numEventsCounterMap.containsKey(systemStreamPartition)) {
+        registerSystemStreamPartition(systemStreamPartition);
+      }
+      numEventsCounterMap.get(systemStreamPartition).inc();
+    }
+
+    public void incTotalNumEvents() {
+      numTotalEventsCounter.inc();
+    }
+
+    public MetricsRegistry getMetricsRegistry() {
+      return metricsRegistry;
+    }
+  }
+
+  private class ReaderRunnable implements Runnable {
+    public MultiFileHdfsReader reader;
+
+    public ReaderRunnable(MultiFileHdfsReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+      doPoll(reader);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
new file mode 100644
index 0000000..5abdbbc
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+/**
+ * Util class for methods around partition descriptor.
+ *
+ * Partition descriptor is rich information about a partition: the set
+ * of files�that are associated with the�partition.
+ *
+ * Partition descriptor map, or descriptor map, is the map between the
+ * {@link org.apache.samza.Partition} and the descriptor
+ */
+public class PartitionDescriptorUtil {
+
+  private PartitionDescriptorUtil() {
+
+  }
+
+  private static final int INDENT_FACTOR = 2;
+  private static final String DELIMITER = ",";
+
+  private static String getStringFromPaths(List<String> paths) {
+    return String.join(DELIMITER, paths);
+  }
+
+  private static List<String> getPathsFromString(String descriptor) {
+    return Arrays.asList(descriptor.split(DELIMITER));
+  }
+
+  public static String getJsonFromDescriptorMap(Map<Partition, List<String>> descriptorMap) {
+    JSONObject out = new JSONObject();
+    descriptorMap.forEach((partition, paths) -> {
+      String descriptorStr = getStringFromPaths(paths);
+      try {
+        out.put(String.valueOf(partition.getPartitionId()), descriptorStr);
+      } catch (JSONException e) {
+        throw new SamzaException(
+          String.format("Invalid description to encode. partition=%s, descriptor=%s", partition, descriptorStr), e);
+      }
+    });
+    try {
+      return out.toString(INDENT_FACTOR);
+    } catch (JSONException e) {
+      throw new SamzaException("Failed to generate json string.", e);
+    }
+  }
+
+  public static Map<Partition, List<String>> getDescriptorMapFromJson(String json) {
+    try {
+      @SuppressWarnings("unchecked")
+      Map<String, String> rawMap = new ObjectMapper().readValue(json, HashMap.class);
+      Map<Partition, List<String>> descriptorMap = new HashMap<>();
+      rawMap.forEach((key, value) -> descriptorMap.put(new Partition(Integer.valueOf(key)), getPathsFromString(value)));
+      return descriptorMap;
+    } catch (IOException | NumberFormatException e) {
+      throw new SamzaException("Failed to convert json: " + json, e);
+    }
+  }
+
+  public static Path getPartitionDescriptorPath(String base, String streamName) {
+    Path basePath = new Path(base);
+    Path relativePath = new Path(streamName.replaceAll("\\W", "_") + "_partition_description");
+    return new Path(basePath, relativePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
new file mode 100644
index 0000000..5cad1e4
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -0,0 +1,235 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+/**
+ * The partitioner that takes a directory as an input and does
+ * 1. Filtering, based on a white list and a black list
+ * 2. Grouping, based on grouping pattern
+ *
+ * And then generate the partition metadata and partition descriptors
+ *
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+public class DirectoryPartitioner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
+  private static final String GROUP_IDENTIFIER = "\\[id]";
+
+  private String whiteListRegex;
+  private String blackListRegex;
+  private String groupPattern;
+  private FileSystemAdapter fileSystemAdapter;
+
+  // stream name => partition => partition descriptor
+  private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+
+  public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
+    FileSystemAdapter fileSystemAdapter) {
+    this.whiteListRegex = whiteList;
+    this.blackListRegex = blackList;
+    this.groupPattern = groupPattern;
+    this.fileSystemAdapter = fileSystemAdapter;
+    LOG.info(String
+      .format("Creating DirectoryPartitioner with whiteList=%s, blackList=%s, groupPattern=%s", whiteList, blackList,
+        groupPattern));
+  }
+
+  /*
+   * Based on the stream name, get the list of all files and filter out unneeded ones given
+   * the white list and black list
+   */
+  private List<FileMetadata> getFilteredFiles(String streamName) {
+    List<FileMetadata> filteredFiles = new ArrayList<>();
+    List<FileMetadata> allFiles = fileSystemAdapter.getAllFiles(streamName);
+    LOG.info(String.format("List of all files for %s: %s", streamName, allFiles));
+    allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
+      .forEach(filteredFiles::add);
+    // sort the files to have a consistent order
+    filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+    LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
+    return filteredFiles;
+  }
+
+  /*
+   * Algorithm to extract the group identifier from the path based on
+   * the group pattern.
+   * 1. Split the group pattern into two parts (prefix, suffix)
+   * 2. Match the both prefix and suffix against the input
+   * 3. Strip the prefix and suffix and then we can get the group identifier
+   *
+   * For example,
+   * input = run_2016-08-01-part-3.avro
+   * group pattern = ".*part-[id]/.avro"
+   *
+   * 1. Split: prefix pattern = ".*part-" suffix pattern = "/.avro"
+   * 2. Match: prefix string = "run_2016-08-01-part-" suffix string = ".avro"
+   * 3. Extract: output = "3"
+   *
+   * If we can't extract a group identifier, return the original input
+   */
+  private String extractGroupIdentifier(String input) {
+    if (StringUtils.isBlank(GROUP_IDENTIFIER)) {
+      return input;
+    }
+    String[] patterns = groupPattern.split(GROUP_IDENTIFIER);
+    if (patterns.length != 2) {
+      return input;
+    }
+
+    Pattern p1 = Pattern.compile(patterns[0]);
+    Pattern p2 = Pattern.compile(patterns[1]);
+    Matcher m1 = p1.matcher(input);
+    Matcher m2 = p2.matcher(input);
+    if (!m1.find()) {
+      return input;
+    }
+    int s1 = m1.end();
+    if (!m2.find(s1)) {
+      return input;
+    }
+    int s2 = m2.start();
+    return input.substring(s1, s2);
+  }
+
+  /*
+   * Group partitions based on the group identifier extracted from the file path
+   */
+  private List<List<FileMetadata>> generatePartitionGroups(List<FileMetadata> filteredFiles) {
+    Map<String, List<FileMetadata>> map = new HashMap<>();
+    for (FileMetadata fileMetadata : filteredFiles) {
+      String groupId = extractGroupIdentifier(fileMetadata.getPath());
+      map.putIfAbsent(groupId, new ArrayList<>());
+      map.get(groupId).add(fileMetadata);
+    }
+    List<List<FileMetadata>> ret = new ArrayList<>();
+    // sort the map to guarantee consistent ordering
+    List<String> sortedKeys = new ArrayList<>(map.keySet());
+    sortedKeys.sort(Comparator.<String>naturalOrder());
+    sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
+    return ret;
+  }
+
+   /*
+    * This class holds the assumption that the directory remains immutable.
+    * If the directory does changes:
+    * ignore new files showing up in the directory based on an old version of partition descriptor;
+    * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+    */
+  private List<FileMetadata> validateAndGetOriginalFilteredFiles(List<FileMetadata> newFileList,
+    Map<Partition, List<String>> existingPartitionDescriptor) {
+    assert newFileList != null;
+    assert existingPartitionDescriptor != null;
+    Set<String> oldFileSet = new HashSet<>();
+    existingPartitionDescriptor.values().forEach(oldFileSet::addAll);
+    Set<String> newFileSet = new HashSet<>();
+    newFileList.forEach(file -> newFileSet.add(file.getPath()));
+    if (!newFileSet.containsAll(oldFileSet)) {
+      throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+        + oldFileSet.removeAll(newFileSet));
+    }
+    Iterator<FileMetadata> iterator = newFileList.iterator();
+    while (iterator.hasNext()) {
+      FileMetadata file = iterator.next();
+      if (!oldFileSet.contains(file.getPath())) {
+        iterator.remove();
+      }
+    }
+    return newFileList;
+  }
+
+  /**
+   * Get partition metadata for a stream
+   * @param streamName name of the stream; should contain the information about the path of the
+   *                   root directory
+   * @param existingPartitionDescriptorMap map of the existing partition descriptor
+   * @return map of SSP metadata
+   */
+  public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
+    @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
+    LOG.info("Trying to obtain metadata for " + streamName);
+    LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+      : existingPartitionDescriptorMap));
+    Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
+    partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
+    List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
+    if (existingPartitionDescriptorMap != null) {
+      filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
+    }
+    List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
+    int partitionId = 0;
+    for (List<FileMetadata> fileGroup : groupedPartitions) {
+      Partition partition = new Partition(partitionId);
+      List<String> pathList = new ArrayList<>();
+      List<String> lengthList = new ArrayList<>();
+      fileGroup.forEach(fileMetadata -> {
+        pathList.add(fileMetadata.getPath());
+        lengthList.add(String.valueOf(fileMetadata.getLen()));
+      });
+      String oldestOffset = MultiFileHdfsReader.generateOffset(0, "0");
+      String newestOffset = MultiFileHdfsReader.generateOffset(lengthList.size() - 1, String.valueOf(lengthList.get(lengthList.size() - 1)));
+      SystemStreamPartitionMetadata metadata =
+        new SystemStreamPartitionMetadata(oldestOffset, newestOffset, null);
+      partitionMetadataMap.put(partition, metadata);
+      partitionDescriptorMap.get(streamName).put(partition, pathList);
+      partitionId++;
+    }
+    LOG.info("Obtained metadata map as: " + partitionMetadataMap);
+    LOG.info("Computed partition description as: " + partitionDescriptorMap);
+    return partitionMetadataMap;
+  }
+
+  /**
+   * Get partition descriptors for a stream
+   * @param streamName name of the stream; should contain the information about the path of the
+   *                   root directory
+   * @return map of the partition descriptor
+   */
+  public Map<Partition, List<String>> getPartitionDescriptor(String streamName) {
+    return partitionDescriptorMap.get(streamName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
new file mode 100644
index 0000000..5fec4bf
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.List;
+
+
+/**
+ * An adapter between directory partitioner and the actual file systems or
+ * file system like systems.
+ */
+public interface FileSystemAdapter {
+
+  /**
+   * Return the list of all files given the stream name
+   * @param streamName name of the stream
+   * @return list of <code>FileMetadata</code> for all files associated to the given stream
+   */
+  public List<FileMetadata> getAllFiles(String streamName);
+
+  public class FileMetadata {
+    private String path;
+    private long length;
+
+    public FileMetadata(String path, long length) {
+      this.path = path;
+      this.length = length;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public long getLen() {
+      return length;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("[path = %s, length = %s]", path, length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
new file mode 100644
index 0000000..bb7b3fa
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HdfsFileSystemAdapter implements FileSystemAdapter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsFileSystemAdapter.class);
+
+  public List<FileMetadata> getAllFiles(String streamName) {
+    List<FileMetadata> ret = new ArrayList<>();
+    try {
+      Path streamPath = new Path(streamName);
+      FileSystem fileSystem = streamPath.getFileSystem(new Configuration());
+      FileStatus[] fileStatuses = fileSystem.listStatus(streamPath);
+      for (FileStatus fileStatus : fileStatuses) {
+        ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get the list of files for " + streamName, e);
+      throw new SamzaException(e);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
new file mode 100644
index 0000000..757b40b
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
@@ -0,0 +1,216 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.AvroFSInput;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the HdfsReader that reads and processes avro format
+ * files.
+ */
+public class AvroFileHdfsReader implements SingleFileHdfsReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroFileHdfsReader.class);
+
+  private final SystemStreamPartition systemStreamPartition;
+  private DataFileReader<GenericRecord> fileReader;
+  private long curBlockStart;
+  private long curRecordOffset;
+
+  public AvroFileHdfsReader(SystemStreamPartition systemStreamPartition) {
+    this.systemStreamPartition = systemStreamPartition;
+    this.fileReader = null;
+  }
+
+  @Override
+  public void open(String pathStr, String singleFileOffset) {
+    LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset));
+    Path path = new Path(pathStr);
+    try {
+      AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path);
+      fileReader = new DataFileReader<>(input, new GenericDatumReader<>());
+      seek(singleFileOffset);
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public void seek(String singleFileOffset) {
+    try {
+      // See comments for AvroFileCheckpoint to understand the behavior below
+      AvroFileCheckpoint checkpoint = new AvroFileCheckpoint(singleFileOffset);
+      if (checkpoint.isStartingOffset()) {
+        // seek to the beginning of the first block
+        fileReader.sync(0);
+        curBlockStart = fileReader.previousSync();
+        curRecordOffset = 0;
+        return;
+      }
+      fileReader.seek(checkpoint.getBlockStart());
+      for (int i = 0; i < checkpoint.getRecordOffset(); i++) {
+        if (fileReader.hasNext()) {
+          fileReader.next();
+        }
+      }
+      curBlockStart = checkpoint.getBlockStart();
+      curRecordOffset = checkpoint.getRecordOffset();
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public IncomingMessageEnvelope readNext() {
+    // get checkpoint for THIS record
+    String checkpoint = nextOffset();
+    GenericRecord record = fileReader.next();
+    if (fileReader.previousSync() != curBlockStart) {
+      curBlockStart = fileReader.previousSync();
+      curRecordOffset = 0;
+    } else {
+      curRecordOffset++;
+    }
+    // avro schema doesn't necessarily have key field
+    return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return fileReader.hasNext();
+  }
+
+  @Override
+  public void close() {
+    LOG.info("About to close file reader for " + systemStreamPartition);
+    try {
+      fileReader.close();
+    } catch (IOException e) {
+      throw new SamzaException(e);
+    }
+    LOG.info("File reader closed for " + systemStreamPartition);
+  }
+
+  @Override
+  public String nextOffset() {
+    return AvroFileCheckpoint.generateCheckpointStr(curBlockStart, curRecordOffset);
+  }
+
+  public static int offsetComparator(String offset1, String offset2) {
+    AvroFileCheckpoint cp1 = new AvroFileCheckpoint(offset1);
+    AvroFileCheckpoint cp2 = new AvroFileCheckpoint(offset2);
+    return cp1.compareTo(cp2);
+  }
+
+  /**
+   * An avro file looks something like this:
+   *
+   * Byte offset: 0       103            271         391
+   *              \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+   * Avro file:   \u2502 Header \u2502    Block 1   \u2502  Block 2  \u2502  Block 3  \u2502 ...
+   *              \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+   *
+   * Each block contains multiple records. The start of a block is defined as a valid
+   * synchronization point. A file reader can only seek to a synchronization point, i.e.
+   * the start of blocks. Thus, to precisely describe the location of a record, we need
+   * to use the pair (blockStart, recordOffset). Here "blockStart" means the start of the
+   * block and "recordOffset" means the index of the record within the block.
+   * Take the example above, and suppose block 1 has 4 records, we have record sequences as:
+   * (103, 0), (103, 1), (103, 2), (103, 3), (271, 0), ...
+   * where (271, 0) represents the first event in block 2
+   *
+   * With the CP_DELIM being '@', the actual checkpoint string would look like "103@1",
+   * "271@0" or "271", etc. For convenience, a checkpoint with only the blockStart but no
+   * recordOffset within the block simply means the first record in that block. Thus,
+   * "271@0" is equal to "271".
+   */
+  public static class AvroFileCheckpoint {
+    private static final String CP_DELIM = "@";
+    private long blockStart; // start position of the block
+    private long recordOffset; // record offset within the block
+    String checkpointStr;
+
+    public static String generateCheckpointStr(long blockStart, long recordOffset) {
+      return blockStart + CP_DELIM + recordOffset;
+    }
+
+    public AvroFileCheckpoint(String checkpointStr) {
+      String[] elements = checkpointStr.replaceAll("\\s", "").split(CP_DELIM);
+      if (elements.length > 2 || elements.length < 1) {
+        throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr);
+      }
+      try {
+        blockStart = Long.parseLong(elements[0]);
+        recordOffset = elements.length == 2 ? Long.parseLong(elements[1]) : 0;
+      } catch (NumberFormatException e) {
+        throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr, e);
+      }
+      this.checkpointStr = checkpointStr;
+    }
+
+    public AvroFileCheckpoint(long blockStart, long recordOffset) {
+      this.blockStart = blockStart;
+      this.recordOffset = recordOffset;
+      this.checkpointStr = generateCheckpointStr(blockStart, recordOffset);
+    }
+
+    public long getBlockStart() {
+      return blockStart;
+    }
+
+    public long getRecordOffset() {
+      return recordOffset;
+    }
+
+    public String getCheckpointStr() {
+      return checkpointStr;
+    }
+
+    public boolean isStartingOffset() {
+      return blockStart == 0;
+    }
+
+    public int compareTo(AvroFileCheckpoint other) {
+      if (this.blockStart < other.blockStart) {
+        return -1;
+      } else if (this.blockStart > other.blockStart) {
+        return 1;
+      } else return Long.compare(this.recordOffset, other.recordOffset);
+    }
+
+    @Override
+    public String toString() {
+      return getCheckpointStr();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
new file mode 100644
index 0000000..4efdfd7
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class HdfsReaderFactory {
+  public static SingleFileHdfsReader getHdfsReader(ReaderType readerType, SystemStreamPartition systemStreamPartition) {
+    switch (readerType) {
+      case AVRO: return new AvroFileHdfsReader(systemStreamPartition);
+      default:
+        throw new SamzaException("Unsupported reader type: " + readerType);
+    }
+  }
+
+  public static ReaderType getType(String readerTypeStr) {
+    try {
+      return ReaderType.valueOf(readerTypeStr.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new SamzaException("Invalid hdfs reader type string: " + readerTypeStr, e);
+    }
+  }
+
+  public static int offsetComparator(ReaderType readerType, String offset1, String offset2) {
+    switch (readerType) {
+      case AVRO: return AvroFileHdfsReader.offsetComparator(offset1, offset2);
+      default:
+        throw new SamzaException("Unsupported reader type: " + readerType);
+    }
+  }
+
+  /*
+   * Support AVRO only so far. Implement <code>SingleFileHdfsReader</code> to support a variety of
+   * file parsers. Can easily support "plain" text in the future (each line of the
+   * text representing a record for example)
+   */
+  public enum ReaderType {
+    AVRO
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
new file mode 100644
index 0000000..7870713
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A wrapper on top of {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader}
+ * to manage the situation of multiple files per partition.
+ *
+ * The offset for MultiFileHdfsReader, which is also the offset that gets
+ * committed in and used by Samza, consists of two parts: file index,
+ * actual offset within file. For example, 3:127
+ *
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ */
+public class MultiFileHdfsReader {
+  private static final Logger LOG = LoggerFactory.getLogger(MultiFileHdfsReader.class);
+  private static final String DELIMITER = ":";
+
+  private final HdfsReaderFactory.ReaderType readerType;
+  private final SystemStreamPartition systemStreamPartition;
+  private List<String> filePaths;
+  private SingleFileHdfsReader curReader;
+  private int curFileIndex = 0;
+  private String curSingleFileOffset;
+  private int numRetries;
+  private int numMaxRetries;
+
+  /**
+   * Get the current file index from the offset string
+   * @param offset offset string that contains both file index and offset within file
+   * @return the file index part
+   */
+  public static int getCurFileIndex(String offset) {
+    String[] elements = offset.split(DELIMITER);
+    if (elements.length < 2) {
+      throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+    }
+    return Integer.parseInt(elements[0]);
+  }
+
+  /**
+   * Get the offset within file from the offset string
+   * @param offset offset string that contains both file index and offset within file
+   * @return the single file offset part
+   */
+  public static String getCurSingleFileOffset(String offset) {
+    String[] elements = offset.split(DELIMITER);
+    if (elements.length < 2) {
+      throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+    }
+    // Getting the remaining of the offset string in case the single file
+    // offset uses the same delimiter.
+    return offset.substring(elements[0].length() + 1);
+  }
+
+  /**
+   * Generate the offset based on file index and offset within single file
+   * @param fileIndex index of the file
+   * @param singleFileOffset offset within single file
+   * @return the complete offset
+   */
+  public static String generateOffset(int fileIndex, String singleFileOffset) {
+    return fileIndex + DELIMITER + singleFileOffset;
+  }
+
+  /*
+   * Get current offset: offset of the LAST message being successfully read. If no messages have
+   * ever been read, return the offset of first event.
+   */
+  private String getCurOffset() {
+    return generateOffset(curFileIndex, curSingleFileOffset);
+  }
+
+  public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+    List<String> partitionDescriptors, String offset) {
+    this(readerType, systemStreamPartition, partitionDescriptors, offset,
+      Integer.parseInt(HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT()));
+  }
+
+  private void init(String offset) {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+    curFileIndex = getCurFileIndex(offset);
+    if (curFileIndex >= filePaths.size()) {
+      throw new SamzaException(
+        String.format("Invalid file index %d. Number of files is %d", curFileIndex, filePaths.size()));
+    }
+    curSingleFileOffset = getCurSingleFileOffset(offset);
+    curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+    curReader.open(filePaths.get(curFileIndex), curSingleFileOffset);
+  }
+
+  public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+    List<String> partitionDescriptors, String offset, int numMaxRetries) {
+    this.readerType = readerType;
+    this.systemStreamPartition = systemStreamPartition;
+    this.filePaths = partitionDescriptors;
+    this.numMaxRetries = numMaxRetries;
+    this.numRetries = 0;
+    if (partitionDescriptors.size() <= 0) {
+      throw new SamzaException(
+        "Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
+    }
+    init(offset);
+  }
+
+  public boolean hasNext() {
+    while (curFileIndex < filePaths.size()) {
+      if (curReader.hasNext()) {
+        return true;
+      }
+      curReader.close();
+      curFileIndex++;
+      if (curFileIndex < filePaths.size()) {
+        curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+        curReader.open(filePaths.get(curFileIndex), "0");
+      }
+    }
+    return false;
+  }
+
+  public IncomingMessageEnvelope readNext() {
+    if (!hasNext()) {
+      LOG.warn("Attempting to read more data when there aren't any. ssp=" + systemStreamPartition);
+      return null;
+    }
+    // record the next offset before we read, so when the read fails and we reconnect,
+    // we seek to the same offset that we try below
+    curSingleFileOffset = curReader.nextOffset();
+    IncomingMessageEnvelope messageEnvelope = curReader.readNext();
+    // Copy everything except for the offset. Turn the single-file style offset into a multi-file one
+    return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
+      messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+  }
+
+  /**
+   * Reconnect to the file systems in case of failure.
+   * Reset offset to the last checkpoint (last successfully read message).
+   * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+   * retries.
+   */
+  public void reconnect() {
+    reconnect(getCurOffset());
+  }
+
+  /**
+   * Reconnect to the file systems in case of failures.
+   * @param offset reset offset to the specified offset
+   * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+   * retries.
+   */
+  public void reconnect(String offset) {
+    if (numRetries >= numMaxRetries) {
+      throw new SamzaException(
+        String.format("Give up reconnecting. numRetries: %d; numMaxRetries: %d", numRetries, numMaxRetries));
+    }
+    LOG.info(String
+      .format("Reconnecting with offset: %s numRetries: %d numMaxRetries: %d", offset, numRetries, numMaxRetries));
+    numRetries++;
+    init(offset);
+  }
+
+  public void close() {
+    LOG.info(String.format("MiltiFileHdfsReader shutdown requested for %s. Current offset = %s", systemStreamPartition,
+      getCurOffset()));
+    if (curReader != null) {
+      curReader.close();
+    }
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return systemStreamPartition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
new file mode 100644
index 0000000..eb8a70d
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+public interface SingleFileHdfsReader {
+  /**
+   * Open the file and seek to specific offset for reading.
+   * @param path path of the file to be read
+   * @param offset offset the reader should start from
+   */
+  public void open(String path, String offset);
+
+  /**
+   * Seek to a specific offset
+   * @param offset offset the reader should seek to
+   */
+  public void seek(String offset);
+
+  /**
+   * Construct and return the next message envelope
+   * @return constructed IncomeMessageEnvelope
+   */
+  public IncomingMessageEnvelope readNext();
+
+  /**
+   * Get the next offset, which is the offset for the next message
+   * that will be returned by readNext
+   * @return next offset
+   */
+  public String nextOffset();
+
+  /**
+   * Whether there are still records to be read
+   * @return true of false based on whether the reader has hit end of file
+   */
+  public boolean hasNext();
+
+  /**
+   * Close the reader.
+   */
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 61b7570..53ff372 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -25,7 +25,7 @@ import java.util.UUID
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.{Config, ScalaMapConfig}
+import org.apache.samza.config.{YarnConfig, Config, ScalaMapConfig}
 import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConversions._
@@ -62,6 +62,34 @@ object HdfsConfig {
   val BUCKETER_CLASS = "systems.%s.producer.hdfs.bucketer.class"
   val BUCKETER_CLASS_DEFAULT = "org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer"
 
+  // capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+  val CONSUMER_BUFFER_CAPACITY = "systems.%s.consumer.bufferCapacity"
+  val CONSUMER_BUFFER_CAPACITY_DEFAULT = 10.toString
+
+  // number of max retries for the hdfs consumer readers per partition
+  val CONSUMER_NUM_MAX_RETRIES = "system.%s.consumer.numMaxRetries"
+  val CONSUMER_NUM_MAX_RETRIES_DEFAULT = 10.toString
+
+  // white list used by directory partitioner to filter out unwanted files in a hdfs directory
+  val CONSUMER_PARTITIONER_WHITELIST = "systems.%s.partitioner.defaultPartitioner.whitelist"
+  val CONSUMER_PARTITIONER_WHITELIST_DEFAULT = ".*"
+
+  // black list used by directory partitioner to filter out unwanted files in a hdfs directory
+  val CONSUMER_PARTITIONER_BLACKLIST = "systems.%s.partitioner.defaultPartitioner.blacklist"
+  val CONSUMER_PARTITIONER_BLACKLIST_DEFAULT = ""
+
+  // group pattern used by directory partitioner for advanced partitioning
+  val CONSUMER_PARTITIONER_GROUP_PATTERN = "systems.%s.partitioner.defaultPartitioner.groupPattern"
+  val CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT = ""
+
+  // type of the file reader (avro, plain, etc.)
+  val FILE_READER_TYPE = "systems.%s.consumer.reader"
+  val FILE_READER_TYPE_DEFAULT = "avro"
+
+  // staging directory for storing partition description
+  val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
+  val STAGING_DIRECTORY_DEFAULT = ""
+
   implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
 
 }
@@ -130,4 +158,53 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
     getOrElse(HdfsConfig.COMPRESSION_TYPE format systemName, HdfsConfig.COMPRESSION_TYPE_DEFAULT)
   }
 
+  /**
+   * Get the capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+   */
+  def getConsumerBufferCapacity(systemName: String): Int = {
+    getOrElse(HdfsConfig.CONSUMER_BUFFER_CAPACITY format systemName, HdfsConfig.CONSUMER_BUFFER_CAPACITY_DEFAULT).toInt
+  }
+
+  /**
+    * Get number of max retries for the hdfs consumer readers per partition
+    */
+  def getConsumerNumMaxRetries(systemName: String): Int = {
+    getOrElse(HdfsConfig.CONSUMER_NUM_MAX_RETRIES format systemName, HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT).toInt
+  }
+
+  /**
+   * White list used by directory partitioner to filter out unwanted files in a hdfs directory
+   */
+  def getPartitionerWhiteList(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_WHITELIST_DEFAULT)
+  }
+
+  /**
+   * Black list used by directory partitioner to filter out unwanted files in a hdfs directory
+   */
+  def getPartitionerBlackList(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST_DEFAULT)
+  }
+
+  /**
+   * Group pattern used by directory partitioner for advanced partitioning
+   */
+  def getPartitionerGroupPattern(systemName: String): String = {
+    getOrElse(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN format systemName, HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT)
+  }
+
+  /**
+   * Get the type of the file reader (avro, plain, etc.)
+   */
+  def getFileReaderType(systemName: String): String = {
+    getOrElse(HdfsConfig.FILE_READER_TYPE format systemName, HdfsConfig.FILE_READER_TYPE_DEFAULT)
+  }
+
+  /**
+   * Staging directory for storing partition description. If not set, will use the staging directory set
+   * by yarn job.
+   */
+  def getStagingDirectory(): String = {
+    getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
deleted file mode 100644
index 92eb447..0000000
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.hdfs
-
-
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{SystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-
-
-class HdfsSystemAdmin extends SystemAdmin with Logging {
-
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
-    new java.util.HashMap[SystemStreamPartition, String]()
-  }
-
-  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    new java.util.HashMap[String, SystemStreamMetadata]()
-  }
-
-  def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def createCoordinatorStream(streamName: String) {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-
-  def offsetComparator(offset1: String, offset2: String) = {
-    throw new UnsupportedOperationException("Method not implemented.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index ef3c20a..3673431 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -1,45 +1,45 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
 
 package org.apache.samza.system.hdfs
 
 
-import org.apache.samza.SamzaException
-
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.{KafkaUtil,Logging}
+import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
+import org.apache.samza.util.{KafkaUtil, Logging}
 
 
 class HdfsSystemFactory extends SystemFactory with Logging {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    throw new SamzaException("HdfsSystemFactory does not implement a consumer")
+    new HdfsSystemConsumer(systemName, config, new HdfsSystemConsumerMetrics(registry))
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    // TODO: SAMZA-1026: should remove Kafka dependency below
     val clientId = KafkaUtil.getClientId("samza-producer", config)
     val metrics = new HdfsSystemProducerMetrics(systemName, registry)
     new HdfsSystemProducer(systemName, clientId, config, metrics)
   }
 
   def getAdmin(systemName: String, config: Config) = {
-    new HdfsSystemAdmin
+    new HdfsSystemAdmin(systemName, config)
   }
 }