You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/10/05 18:39:43 UTC
[1/2] samza git commit: SAMZA-967: HDFS System Consumer
Repository: samza
Updated Branches:
refs/heads/master f4d924fd2 -> 2216fe0b7
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
new file mode 100644
index 0000000..ef5ab00
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.TestAvroFileHdfsReader;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+
+public class TestHdfsSystemConsumer {
+
+ private static final String SYSTEM_NAME = "hdfs";
+ private static final String FIELD_1 = "field1";
+ private static final String FIELD_2 = "field2";
+ private static final String WORKING_DIRECTORY = TestHdfsSystemConsumer.class.getResource("/integTest").getPath();
+ private static final String AVRO_FILE_1 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-01.avro";
+ private static final String AVRO_FILE_2 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-02.avro";
+ private static final String AVRO_FILE_3 = WORKING_DIRECTORY + "/TestHdfsSystemConsumer-03.avro";
+ private static final int NUM_FILES = 3;
+ private static final int NUM_EVENTS = 100;
+
+ private Config generateDefaultConfig() throws IOException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*TestHdfsSystemConsumer.*avro");
+ Path stagingDirectory = Files.createTempDirectory("staging");
+ stagingDirectory.toFile().deleteOnExit();
+ properties.put(HdfsConfig.STAGING_DIRECTORY(), stagingDirectory.toString());
+ return new MapConfig(properties);
+ }
+
+ private void generateAvroDataFiles() throws Exception {
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_1, NUM_EVENTS);
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_2, NUM_EVENTS);
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_3, NUM_EVENTS);
+ }
+
+ /*
+ * A simple end to end test that covers the workflow from system admin to
+ * partitioner, system consumer, and so on, making sure the basic functionality
+ * works as expected.
+ */
+ @Test
+ public void testHdfsSystemConsumerE2E() throws Exception {
+ Config config = generateDefaultConfig();
+ HdfsSystemFactory systemFactory = new HdfsSystemFactory();
+
+ // create admin and do partitioning
+ HdfsSystemAdmin systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config);
+ String streamName = WORKING_DIRECTORY;
+ Set<String> streamNames = new HashSet<>();
+ streamNames.add(streamName);
+ generateAvroDataFiles();
+ Map<String, SystemStreamMetadata> streamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+ SystemStreamMetadata systemStreamMetadata = streamMetadataMap.get(streamName);
+ Assert.assertEquals(NUM_FILES, systemStreamMetadata.getSystemStreamPartitionMetadata().size());
+
+ // create consumer and read from files
+ HdfsSystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, new NoOpMetricsRegistry());
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = systemStreamMetadata.getSystemStreamPartitionMetadata();
+ Set<SystemStreamPartition> systemStreamPartitionSet = new HashSet<>();
+ metadataMap.forEach((partition, metadata) -> {
+ SystemStreamPartition ssp = new SystemStreamPartition(SYSTEM_NAME, streamName, partition);
+ systemStreamPartitionSet.add(ssp);
+ String offset = metadata.getOldestOffset();
+ systemConsumer.register(ssp, offset);
+ });
+ systemConsumer.start();
+
+ // verify events read from consumer
+ int eventsReceived = 0;
+ int totalEvents = (NUM_EVENTS + 1) * NUM_FILES; // one "End of Stream" event in the end
+ int remainingRetires = 100;
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> overallResults = new HashMap<>();
+ while (eventsReceived < totalEvents && remainingRetires > 0) {
+ remainingRetires--;
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 200);
+ for(SystemStreamPartition ssp : result.keySet()) {
+ List<IncomingMessageEnvelope> messageEnvelopeList = result.get(ssp);
+ overallResults.putIfAbsent(ssp, new ArrayList<>());
+ overallResults.get(ssp).addAll(messageEnvelopeList);
+ if (overallResults.get(ssp).size() >= NUM_EVENTS + 1) {
+ systemStreamPartitionSet.remove(ssp);
+ }
+ eventsReceived += messageEnvelopeList.size();
+ }
+ }
+ Assert.assertEquals(eventsReceived, totalEvents);
+ Assert.assertEquals(NUM_FILES, overallResults.size());
+ overallResults.values().forEach(messages -> {
+ Assert.assertEquals(NUM_EVENTS + 1, messages.size());
+ for (int index = 0;index < NUM_EVENTS; index++) {
+ GenericRecord record = (GenericRecord) messages.get(index).getMessage();
+ Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+ Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+ }
+ Assert.assertEquals(messages.get(NUM_EVENTS).getOffset(), IncomingMessageEnvelope.END_OF_STREAM_OFFSET);
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
new file mode 100644
index 0000000..c8ddbe0
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptorUtil.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestPartitionDesctiptorUtil {
+
+ @Test
+ public void testBasicEncodeDecode() {
+ Map<Partition, List<String>> input = new HashMap<>();
+ input.put(new Partition(0), Collections.singletonList("path_0"));
+ String[] array = {"path_1", "path_2"};
+ input.put(new Partition(1), Arrays.asList(array));
+ input.put(new Partition(3), Collections.singletonList("path_3"));
+ String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+ Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ Assert.assertEquals(3, output.entrySet().size());
+ Assert.assertTrue(output.containsKey(new Partition(0)));
+ Assert.assertEquals("path_0", output.get(new Partition(0)).get(0));
+ Assert.assertTrue(output.containsKey(new Partition(1)));
+ Assert.assertEquals("path_1", output.get(new Partition(1)).get(0));
+ Assert.assertEquals("path_2", output.get(new Partition(1)).get(1));
+ Assert.assertTrue(output.containsKey(new Partition(3)));
+ Assert.assertEquals("path_3", output.get(new Partition(3)).get(0));
+ }
+
+ @Test
+ public void testSingleEntry() {
+ Map<Partition, List<String>> input = new HashMap<>();
+ input.put(new Partition(1), Collections.singletonList("random_path_1"));
+ String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+ Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ Assert.assertEquals(1, output.entrySet().size());
+ Assert.assertTrue(output.containsKey(new Partition(1)));
+ Assert.assertEquals("random_path_1", output.get(new Partition(1)).get(0));
+ }
+
+ @Test
+ public void testKeyOverriding() {
+ Map<Partition, List<String>> input = new HashMap<>();
+ input.put(new Partition(0), Collections.singletonList("path_0"));
+ input.put(new Partition(0), Collections.singletonList("new_path_0"));
+ String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+ Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ Assert.assertEquals(1, output.entrySet().size());
+ Assert.assertTrue(output.containsKey(new Partition(0)));
+ Assert.assertEquals("new_path_0", output.get(new Partition(0)).get(0));
+ }
+
+ @Test
+ public void testEmptyInput() {
+ Map<Partition, List<String>> input = new HashMap<>();
+ String json = PartitionDescriptorUtil.getJsonFromDescriptorMap(input);
+ Assert.assertNotNull(json);
+ Map<Partition, List<String>> output = PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ Assert.assertTrue(output.isEmpty());
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testInvalidInput() {
+ String json = "invalidStr";
+ PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
new file mode 100644
index 0000000..aea32ff
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+public class TestDirectoryPartitioner {
+
+ class TestFileSystemAdapter implements FileSystemAdapter {
+ private List<FileMetadata> expectedList;
+
+ public TestFileSystemAdapter(List<FileMetadata> expectedList) {
+ this.expectedList = expectedList;
+ }
+
+ public List<FileMetadata> getAllFiles(String streamName) {
+ return expectedList;
+ }
+ }
+
+ private void verifyPartitionDescriptor(String[] inputFiles, int[][] expectedPartitioning, int expectedNumPartition,
+ Map<Partition, List<String>> actualPartitioning) {
+ Assert.assertEquals(expectedNumPartition, actualPartitioning.size());
+ Set<String> actualPartitioningPath = new HashSet<>();
+ actualPartitioning.values().forEach(list -> actualPartitioningPath.add(String.join(",", list)));
+ for (int i = 0; i < expectedNumPartition; i++) {
+ int[] indexes = expectedPartitioning[i];
+ List<String> files = new ArrayList<>();
+ for (int j : indexes) {
+ files.add(inputFiles[j]);
+ }
+ files.sort(Comparator.<String>naturalOrder());
+ String expectedCombinedPath = String.join(",", files);
+ Assert.assertTrue(actualPartitioningPath.contains(expectedCombinedPath));
+ }
+ }
+
+ @Test
+ public void testBasicWhiteListFiltering() {
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 9;
+ String[] inputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "delta-01.avro",
+ "part-005.avro",
+ "delta-03.avro",
+ "part-004.avro",
+ "delta-02.avro",
+ "part-006.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+ String whiteList = "part-.*\\.avro";
+ String blackList = "";
+ String groupPattern = "";
+ int EXPECTED_NUM_PARTITION = 6;
+ int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {4}, {6}, {8}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriptorMap);
+ }
+
+ @Test
+ public void testBasicBlackListFiltering() {
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 9;
+ String[] inputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "delta-01.avro",
+ "part-005.avro",
+ "delta-03.avro",
+ "part-004.avro",
+ "delta-02.avro",
+ "part-006.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+ String whiteList = ".*";
+ String blackList = "delta-.*\\.avro";
+ String groupPattern = "";
+ int EXPECTED_NUM_PARTITION = 6;
+ int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {4}, {6}, {8}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+ }
+
+ @Test
+ public void testWhiteListBlackListFiltering() {
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 9;
+ String[] inputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "delta-01.avro",
+ "part-005.avro",
+ "delta-03.avro",
+ "part-004.avro",
+ "delta-02.avro",
+ "part-006.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+ String whiteList = "part-.*\\.avro";
+ String blackList = "part-002.avro";
+ String groupPattern = "";
+ int EXPECTED_NUM_PARTITION = 5;
+ int[][] EXPECTED_PARTITIONING = {{0}, {2}, {4}, {6}, {8}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+ }
+
+ @Test
+ public void testBasicGrouping() {
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 9;
+ String[] inputFiles = {
+ "00_10-run_2016-08-15-13-04-part.0.150582.avro",
+ "00_10-run_2016-08-15-13-04-part.1.138132.avro",
+ "00_10-run_2016-08-15-13-04-part.2.214005.avro",
+ "00_10-run_2016-08-15-13-05-part.0.205738.avro",
+ "00_10-run_2016-08-15-13-05-part.1.158273.avro",
+ "00_10-run_2016-08-15-13-05-part.2.982345.avro",
+ "00_10-run_2016-08-15-13-06-part.0.313245.avro",
+ "00_10-run_2016-08-15-13-06-part.1.234212.avro",
+ "00_10-run_2016-08-15-13-06-part.2.413232.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+
+ String whiteList = ".*\\.avro";
+ String blackList = "";
+ String groupPattern = ".*part\\.[id]\\..*\\.avro"; // 00_10-run_2016-08-15-13-04-part.[id].138132.avro
+ int EXPECTED_NUM_PARTITION = 3;
+ int[][] EXPECTED_PARTITIONING = {
+ {0, 3, 6}, // files from index 0, 3, 6 should be grouped into one partition
+ {1, 4, 7}, // similar as above
+ {2, 5, 8}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+ }
+
+ @Test
+ public void testValidDirectoryUpdating() {
+ // the update is valid when there are only new files being added to the directory
+ // no changes on the old files
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 6;
+ String[] inputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "part-005.avro",
+ "part-004.avro",
+ "part-006.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+ String whiteList = ".*";
+ String blackList = "";
+ String groupPattern = "";
+ int EXPECTED_NUM_PARTITION = 6;
+ int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {3}, {4}, {5}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+
+ NUM_INPUT = 7;
+ String[] updatedInputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "part-005.avro",
+ "part-004.avro",
+ "part-007.avro", // add a new file to the directory
+ "part-006.avro"};
+ long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 2513454, 982345};
+ testList.clear();
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
+ }
+ directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size()); // still expect only 6 partitions instead of 7
+ Map<Partition, List<String>> updatedDescriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, updatedDescriptorMap);
+ }
+
+ @Test
+ public void testInvalidDirectoryUpdating() {
+ // the update is invalid when at least one old file is removed
+ List<FileMetadata> testList = new ArrayList<>();
+ int NUM_INPUT = 6;
+ String[] inputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "part-005.avro",
+ "part-004.avro",
+ "part-006.avro"};
+ long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
+ }
+ String whiteList = ".*";
+ String blackList = "";
+ String groupPattern = "";
+ int EXPECTED_NUM_PARTITION = 6;
+ int[][] EXPECTED_PARTITIONING = {{0}, {1}, {2}, {3}, {4}, {5}};
+
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
+ Assert.assertEquals(EXPECTED_NUM_PARTITION, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
+ verifyPartitionDescriptor(inputFiles, EXPECTED_PARTITIONING, EXPECTED_NUM_PARTITION, descriporMap);
+
+ String[] updatedInputFiles = {
+ "part-001.avro",
+ "part-002.avro",
+ "part-003.avro",
+ "part-005.avro",
+ "part-007.avro", // remove part-004 and replace it with 007
+ "part-006.avro"};
+ long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 982345};
+ testList.clear();
+ for (int i = 0; i < NUM_INPUT; i++) {
+ testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
+ }
+ directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
+ try {
+ directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
+ Assert.fail("Expect exception thrown from getting metadata. Should not reach this point.");
+ } catch (SamzaException e) {
+ // expect exception to be thrown
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
new file mode 100644
index 0000000..0fb461f
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestHdfsFileSystemAdapter {
+
+ @Test
+ public void testGetAllFiles()
+ throws Exception {
+ URL url = this.getClass().getResource("/partitioner");
+ FileSystemAdapter adapter = new HdfsFileSystemAdapter();
+ List<FileSystemAdapter.FileMetadata> result =
+ adapter.getAllFiles(url.getPath());
+ Assert.assertEquals(2, result.size());
+ }
+
+ @Test
+ public void testIntegrationWithPartitioner() throws Exception {
+ URL url = this.getClass().getResource("/partitioner");
+ String whiteList = ".*";
+ String blackList = ".*02";
+ String groupPattern = "";
+ String streamName = String.format(url.getPath());
+ DirectoryPartitioner directoryPartitioner =
+ new DirectoryPartitioner(whiteList, blackList, groupPattern, new HdfsFileSystemAdapter());
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap(streamName, null);
+ Assert.assertEquals(1, metadataMap.size());
+ Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor(streamName);
+ Assert.assertEquals(1, descriporMap.values().size());
+ Assert.assertTrue(descriporMap.get(new Partition(0)).get(0).endsWith("testfile01"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
new file mode 100644
index 0000000..aa828d9
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
@@ -0,0 +1,169 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+
+import java.io.File;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestAvroFileHdfsReader {
+
+ private static final String FIELD_1 = "field1";
+ private static final String FIELD_2 = "field2";
+ private static final String WORKING_DIRECTORY = TestAvroFileHdfsReader.class.getResource("/reader").getPath();
+ private static final String AVRO_FILE = WORKING_DIRECTORY + "/TestAvroFileHdfsReader-01.avro";
+ private static final int NUM_EVENTS = 500;
+
+ public static void writeTestEventsToFile(String path, int numEvents)
+ throws Exception {
+ Schema schema = Schema.parse(TestAvroFileHdfsReader.class.getResourceAsStream("/reader/TestEvent.avsc"));
+ File file = new File(path);
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer);
+ dataFileWriter.create(schema, file);
+ for (int i = 0; i < numEvents; i++) {
+ GenericRecord datum = new GenericData.Record(schema);
+ datum.put(FIELD_1, i);
+ datum.put(FIELD_2, "string_" + i);
+ dataFileWriter.append(datum);
+ }
+ dataFileWriter.close();
+ }
+
+ @BeforeClass
+ public static void writeAvroEvents() throws Exception {
+ writeTestEventsToFile(AVRO_FILE, NUM_EVENTS);
+ }
+
+ @Test
+ public void testSequentialRead() throws Exception {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+ reader.open(AVRO_FILE, "0");
+ int index = 0;
+ while (reader.hasNext()) {
+ GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+ Assert.assertEquals(index, record.get(FIELD_1));
+ Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+ index++;
+ }
+ Assert.assertEquals(NUM_EVENTS, index);
+ reader.close();
+ }
+
+ @Test
+ public void testFileReopen() throws Exception {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+ reader.open(AVRO_FILE, "0");
+ int index = 0;
+ for (;index < NUM_EVENTS / 2; index++) {
+ GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+ Assert.assertEquals(index, record.get(FIELD_1));
+ Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+ }
+ String offset = reader.nextOffset();
+ reader.close();
+ reader = new AvroFileHdfsReader(ssp);
+ reader.open(AVRO_FILE, offset);
+ for (;index < NUM_EVENTS; index++) {
+ GenericRecord record = (GenericRecord) reader.readNext().getMessage();
+ Assert.assertEquals(index, record.get(FIELD_1));
+ Assert.assertEquals("string_" + index, record.get(FIELD_2).toString());
+ }
+ Assert.assertEquals(NUM_EVENTS, index);
+ reader.close();
+ }
+
+ @Test
+ public void testRandomRead() throws Exception {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ SingleFileHdfsReader reader = new AvroFileHdfsReader(ssp);
+ reader.open(AVRO_FILE, "0");
+ for (int i = 0;i < NUM_EVENTS / 2; i++) {
+ reader.readNext();
+ }
+ String offset = reader.nextOffset();
+ IncomingMessageEnvelope envelope = reader.readNext();
+ Assert.assertEquals(offset, envelope.getOffset());
+
+ GenericRecord record1 = (GenericRecord) envelope.getMessage();
+
+ for (int i = 0; i < 5; i++) reader.readNext();
+
+ // seek to the offset within the same reader
+ reader.seek(offset);
+ Assert.assertEquals(offset, reader.nextOffset());
+ envelope = reader.readNext();
+ Assert.assertEquals(offset, envelope.getOffset());
+ GenericRecord record2 = (GenericRecord) envelope.getMessage();
+ Assert.assertEquals(record1, record2);
+ reader.close();
+
+ // open a new reader and initialize it with the offset
+ reader = new AvroFileHdfsReader(ssp);
+ reader.open(AVRO_FILE, offset);
+ envelope = reader.readNext();
+ Assert.assertEquals(offset, envelope.getOffset());
+ GenericRecord record3 = (GenericRecord) envelope.getMessage();
+ Assert.assertEquals(record1, record3);
+ reader.close();
+ }
+
+ @Test
+ public void testOffsetComparator() {
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("0", "1452"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2001@4"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@4", "2010@1"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2011@3"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001", "2001@4"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001", "2010@1"));
+ Assert.assertEquals(-1, AvroFileHdfsReader.offsetComparator("2001@3", "2010"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("1984", "0"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("1984@2", "1984@1"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@2", "1984@2"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@1", "1984@10"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341", "1984@10"));
+ Assert.assertEquals(1, AvroFileHdfsReader.offsetComparator("14341@1", "1984"));
+ Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989", "1989"));
+ Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989@0", "1989"));
+ Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989", "1989@0"));
+ Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("0", "0"));
+ Assert.assertEquals(0, AvroFileHdfsReader.offsetComparator("1989@1", "1989@1"));
+ }
+
+ @Test(expected = Exception.class)
+ public void testOffsetComparator_InvalidInput() {
+ AvroFileHdfsReader.offsetComparator("1982,13", "1930,1");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
new file mode 100644
index 0000000..5682a7c
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
@@ -0,0 +1,182 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMultiFileHdfsReader {
+ private static final String FIELD_1 = "field1";
+ private static final String FIELD_2 = "field2";
+ private static final String WORKING_DIRECTORY = TestMultiFileHdfsReader.class.getResource("/reader").getPath();
+ private static final String AVRO_FILE_1 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-01.avro";
+ private static final String AVRO_FILE_2 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-02.avro";
+ private static final String AVRO_FILE_3 = WORKING_DIRECTORY + "/TestMultiFileHdfsReader-03.avro";
+ private static String[] descriptors = {AVRO_FILE_1, AVRO_FILE_2, AVRO_FILE_3};
+ private static final int NUM_EVENTS = 100;
+
+ @BeforeClass
+ public static void writeAvroEvents()
+ throws Exception {
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_1, NUM_EVENTS);
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_2, NUM_EVENTS);
+ TestAvroFileHdfsReader.writeTestEventsToFile(AVRO_FILE_3, NUM_EVENTS);
+ }
+
+ @Test
+ public void testSequentialRead()
+ throws Exception {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0");
+ int index = 0;
+ while (multiReader.hasNext()) {
+ GenericRecord record = (GenericRecord) multiReader.readNext().getMessage();
+ Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+ Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+ index++;
+ }
+ Assert.assertEquals(3 * NUM_EVENTS, index);
+ multiReader.close();
+ }
+
+ @Test
+ public void testReaderReopen()
+ throws Exception {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+
+ // read until the middle of the first file
+ MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), "0:0");
+ int index = 0;
+ String offset = "0:0";
+ for (; index < NUM_EVENTS / 2; index++) {
+ IncomingMessageEnvelope envelope = multiReader.readNext();
+ GenericRecord record = (GenericRecord) envelope.getMessage();
+ Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+ Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+ offset = envelope.getOffset();
+ }
+ multiReader.close();
+
+ // read until the middle of the second file
+ multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), offset);
+ multiReader.readNext(); // skip one duplicate event
+ for (; index < NUM_EVENTS + NUM_EVENTS / 2; index++) {
+ IncomingMessageEnvelope envelope = multiReader.readNext();
+ GenericRecord record = (GenericRecord) envelope.getMessage();
+ Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+ Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+ offset = envelope.getOffset();
+ }
+ multiReader.close();
+
+ // read the rest of all files
+ multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), offset);
+ multiReader.readNext(); // skip one duplicate event
+ while (multiReader.hasNext()) {
+ IncomingMessageEnvelope envelope = multiReader.readNext();
+ GenericRecord record = (GenericRecord) envelope.getMessage();
+ Assert.assertEquals(index % NUM_EVENTS, record.get(FIELD_1));
+ Assert.assertEquals("string_" + (index % NUM_EVENTS), record.get(FIELD_2).toString());
+ index++;
+ offset = envelope.getOffset();
+ }
+ Assert.assertEquals(3 * NUM_EVENTS, index);
+ multiReader.close();
+
+ // reopen with the offset of the last record
+ multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), offset);
+ multiReader.readNext(); // skip one duplicate event
+ Assert.assertFalse(multiReader.hasNext());
+ multiReader.close();
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testOutOfRangeSingleFileOffset() {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), "0:1000000&0");
+ Assert.fail();
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testOutOfRangeFileIndex() {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ Arrays.asList(descriptors), "3:0");
+ Assert.fail();
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testInvalidPartitionDescriptor() {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp,
+ new ArrayList<>(), "0:0");
+ Assert.fail();
+ }
+
+ @Test
+ public void testReconnect() {
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0");
+ // first read a few events, and then reconnect
+ for (int i = 0; i < NUM_EVENTS / 2; i++) {
+ multiReader.readNext();
+ }
+ IncomingMessageEnvelope envelope = multiReader.readNext();
+ multiReader.reconnect();
+ IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
+ Assert.assertEquals(envelope, envelopeAfterReconnect);
+ multiReader.close();
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testReachingMaxReconnect() {
+ int numMaxRetries = 3;
+ SystemStreamPartition ssp = new SystemStreamPartition("hdfs", "testStream", new Partition(0));
+ MultiFileHdfsReader multiReader = new MultiFileHdfsReader(HdfsReaderFactory.ReaderType.AVRO, ssp, Arrays.asList(descriptors), "0:0", numMaxRetries);
+ // first read a few events, and then reconnect
+ for (int i = 0; i < NUM_EVENTS / 2; i++) {
+ multiReader.readNext();
+ }
+ for (int i = 0; i < numMaxRetries; i++) {
+ IncomingMessageEnvelope envelope = multiReader.readNext();
+ multiReader.reconnect();
+ IncomingMessageEnvelope envelopeAfterReconnect = multiReader.readNext();
+ Assert.assertEquals(envelope, envelopeAfterReconnect);
+ }
+ multiReader.readNext();
+ multiReader.reconnect();
+ Assert.fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/integTest/emptyTestFile
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/integTest/emptyTestFile b/samza-hdfs/src/test/resources/integTest/emptyTestFile
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/integTest/emptyTestFile
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/partitioner/testfile01
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/partitioner/testfile01 b/samza-hdfs/src/test/resources/partitioner/testfile01
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/partitioner/testfile01
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/partitioner/testfile02
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/partitioner/testfile02 b/samza-hdfs/src/test/resources/partitioner/testfile02
new file mode 100644
index 0000000..d216be4
--- /dev/null
+++ b/samza-hdfs/src/test/resources/partitioner/testfile02
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/reader/TestEvent.avsc b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
new file mode 100644
index 0000000..289a589
--- /dev/null
+++ b/samza-hdfs/src/test/resources/reader/TestEvent.avsc
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+ "type" : "record",
+ "name" : "TestEvent",
+ "namespace" : "HDFSConsumer",
+ "fields" : [
+ {
+ "name" : "field1",
+ "type" : "int"
+ },
+ {
+ "name" : "field2",
+ "type" : "string"
+ }]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
index 261310d..db7036a 100644
--- a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
+++ b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
@@ -20,31 +20,25 @@
package org.apache.samza.system.hdfs
-import java.io.{InputStreamReader, File, IOException}
+import java.io.File
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
-import org.apache.avro.file.{SeekableFileInput, CodecFactory, DataFileWriter, DataFileReader}
-import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter, ReflectData}
-import org.apache.avro.specific.SpecificDatumReader
+import org.apache.avro.file.DataFileReader
+import org.apache.avro.reflect.{ReflectData, ReflectDatumReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster}
-import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text}
+import org.apache.hadoop.hdfs.{DFSConfigKeys, MiniDFSCluster}
import org.apache.hadoop.io.SequenceFile.Reader
-
+import org.apache.hadoop.io.{BytesWritable, LongWritable, SequenceFile, Text}
import org.apache.samza.config.Config
-import org.apache.samza.system.hdfs.HdfsConfig._
import org.apache.samza.config.factories.PropertiesConfigFactory
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.hdfs.HdfsConfig._
import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
import org.apache.samza.util.Logging
-
-import org.junit.{AfterClass, BeforeClass, Test}
import org.junit.Assert._
-
-import scala.collection.JavaConverters._
+import org.junit.{AfterClass, Test}
object TestHdfsSystemProducerTestSuite {
@@ -88,7 +82,7 @@ object TestHdfsSystemProducerTestSuite {
}
def buildProducer(name: String, cluster: MiniDFSCluster): Option[HdfsSystemProducer] @unchecked = {
- Some(
+ Some(
hdfsFactory.getProducer(
name,
propsFactory.getConfig(URI.create(RESOURCE_PATH_FORMAT format (new File(".").getCanonicalPath, name))),
@@ -277,7 +271,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
Thread.sleep(PAUSE)
val systemStream = new SystemStream(AVRO_JOB_NAME, TEST)
- val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+ val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
producer.get.stop
@@ -292,7 +286,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath)
val schema = ReflectData.get().getSchema(atc.getClass)
val datumReader = new ReflectDatumReader[Object](schema)
- val tfReader = DataFileReader.openReader(atf, datumReader)
+ val tfReader = new DataFileReader[Object](atf, datumReader)
val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
assertTrue(atc == atc2)
@@ -314,7 +308,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
Thread.sleep(PAUSE)
val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST)
- val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+ val atc = new AvroTestClass(128038204592345678L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
(1 to 20).map {
i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
@@ -332,7 +326,7 @@ class TestHdfsSystemProducerTestSuite extends Logging {
val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath)
val schema = ReflectData.get().getSchema(atc.getClass)
val datumReader = new ReflectDatumReader[Object](schema)
- val tfReader = DataFileReader.openReader(atf, datumReader)
+ val tfReader = new DataFileReader[Object](atf, datumReader)
val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
assertTrue(atc == atc2)
}
@@ -347,12 +341,12 @@ class TestHdfsSystemProducerTestSuite extends Logging {
class TestHdfsSystemProducer(systemName: String, config: HdfsConfig, clientId: String, metrics: HdfsSystemProducerMetrics, mini: MiniDFSCluster)
extends HdfsSystemProducer(systemName, clientId, config, metrics) {
- override val dfs = mini.getFileSystem
+ override val dfs = mini.getFileSystem
}
class TestHdfsSystemFactory extends HdfsSystemFactory {
- def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = {
- new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster)
- }
+ def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = {
+ new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index dacc52d..181102b 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -179,7 +179,7 @@ public class YarnContainerRunner {
Iterator iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
- if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index 4e328a5..625d3bb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -18,6 +18,9 @@
*/
package org.apache.samza.job.yarn
+
+
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.samza.job.StreamJobFactory
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.samza.config.Config
@@ -29,7 +32,11 @@ class YarnJobFactory extends StreamJobFactory {
val hConfig = new YarnConfiguration
hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
hConfig.set("fs.https.impl", classOf[HttpFileSystem].getName)
-
+ hConfig.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
+ // pass along the RM config if has any
+ if (config.containsKey(YarnConfiguration.RM_ADDRESS)) {
+ hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032"))
+ }
new YarnJob(config, hConfig)
}
}
[2/2] samza git commit: SAMZA-967: HDFS System Consumer
Posted by xi...@apache.org.
SAMZA-967: HDFS System Consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2216fe0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2216fe0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2216fe0b
Branch: refs/heads/master
Commit: 2216fe0b7b250e2edfc14d150e59f84884e8cba4
Parents: f4d924f
Author: Hai Lu <lh...@gmail.com>
Authored: Wed Oct 5 11:39:17 2016 -0700
Committer: Xinyu Liu <xi...@apache.org>
Committed: Wed Oct 5 11:39:17 2016 -0700
----------------------------------------------------------------------
build.gradle | 16 +-
.../samza/system/hdfs/HdfsSystemAdmin.java | 221 ++++++++++++++
.../samza/system/hdfs/HdfsSystemConsumer.java | 281 +++++++++++++++++
.../system/hdfs/PartitionDescriptorUtil.java | 97 ++++++
.../hdfs/partitioner/DirectoryPartitioner.java | 235 ++++++++++++++
.../hdfs/partitioner/FileSystemAdapter.java | 60 ++++
.../hdfs/partitioner/HdfsFileSystemAdapter.java | 55 ++++
.../system/hdfs/reader/AvroFileHdfsReader.java | 216 +++++++++++++
.../system/hdfs/reader/HdfsReaderFactory.java | 59 ++++
.../system/hdfs/reader/MultiFileHdfsReader.java | 204 +++++++++++++
.../hdfs/reader/SingleFileHdfsReader.java | 62 ++++
.../apache/samza/system/hdfs/HdfsConfig.scala | 79 ++++-
.../samza/system/hdfs/HdfsSystemAdmin.scala | 52 ----
.../samza/system/hdfs/HdfsSystemFactory.scala | 46 +--
.../system/hdfs/TestHdfsSystemConsumer.java | 136 +++++++++
.../hdfs/TestPartitionDesctiptorUtil.java | 92 ++++++
.../partitioner/TestDirectoryPartitioner.java | 304 +++++++++++++++++++
.../partitioner/TestHdfsFileSystemAdapter.java | 59 ++++
.../hdfs/reader/TestAvroFileHdfsReader.java | 169 +++++++++++
.../hdfs/reader/TestMultiFileHdfsReader.java | 182 +++++++++++
.../src/test/resources/integTest/emptyTestFile | 16 +
.../src/test/resources/partitioner/testfile01 | 16 +
.../src/test/resources/partitioner/testfile02 | 16 +
.../src/test/resources/reader/TestEvent.avsc | 33 ++
.../hdfs/TestHdfsSystemProducerTestSuite.scala | 38 +--
.../samza/job/yarn/YarnContainerRunner.java | 2 +-
.../apache/samza/job/yarn/YarnJobFactory.scala | 9 +-
27 files changed, 2654 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2bea27b..98839f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,8 @@ rat {
'RELEASE.md',
'samza-test/src/main/resources/**',
'samza-hdfs/src/main/resources/**',
- 'samza-hdfs/src/test/resources/**'
+ 'samza-hdfs/src/test/resources/**',
+ 'out/**'
]
}
@@ -333,6 +334,10 @@ project(":samza-yarn_$scalaVersion") {
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
+ compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+ exclude module: 'slf4j-log4j12'
+ exclude module: 'servlet-api'
+ }
compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
@@ -494,10 +499,19 @@ project(":samza-kv-rocksdb_$scalaVersion") {
project(":samza-hdfs_$scalaVersion") {
apply plugin: 'scala'
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
+ sourceSets.main.java.srcDirs = []
+
dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile project(":samza-kafka_$scalaVersion")
+ // currently hdfs system producer/consumer do depend on yarn for two things:
+ // 1. staging directory 2. security
+ // SAMZA-1032 to solve the staging directory dependency
+ compile project(":samza-yarn_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
exclude module: 'slf4j-log4j12'
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
new file mode 100644
index 0000000..8bf31c5
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.partitioner.DirectoryPartitioner;
+import org.apache.samza.system.hdfs.partitioner.HdfsFileSystemAdapter;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The HDFS system admin for {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} and
+ * {@link org.apache.samza.system.hdfs.HdfsSystemProducer}
+ *
+ * A high level overview of the HDFS producer/consumer architecture:
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524 HDFS \u2502
+ * \u2502 Obtain \u2502 \u2502
+ * \u2502 Partition \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 Descriptors \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502 \u2502 Filtering/ \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500\u2500\u2510 Grouping \u2514\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 HDFSAvroFileReader \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 Persist \u2502 \u2502 \u2502
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 Partition \u2502 \u2502 \u2502
+ * \u2502 \u2502 Descriptors \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502 \u2502Directory Partitioner\u2502 \u2502 HDFSAvroWriter \u2502
+ * \u2502 \u2502 IFileReader \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 HDFSSystemConsumer \u2502 \u2502 HDFSSystemAdmin \u2502 \u2502 HDFSSystemProducer \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u253c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502
+ * \u2502 HDFSSystemFactory \u2502
+ * \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ */
+public class HdfsSystemAdmin implements SystemAdmin {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
+
+ private HdfsConfig hdfsConfig;
+ private DirectoryPartitioner directoryPartitioner;
+ private String stagingDirectory; // directory that contains the partition description
+ private HdfsReaderFactory.ReaderType readerType;
+
+ public HdfsSystemAdmin(String systemName, Config config) {
+ hdfsConfig = new HdfsConfig(config);
+ directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
+ hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName),
+ new HdfsFileSystemAdapter());
+ stagingDirectory = hdfsConfig.getStagingDirectory();
+ readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ /*
+ * To actually get the "after" offset we have to seek to that specific offset in the file and
+ * read that record to figure out the location of next record. This is much more expensive operation
+ * compared to the case in KafkaSystemAdmin.
+ * So simply return the same offsets. This will always incur re-processing but such semantics are legit
+ * in Samza.
+ */
+ return offsets;
+ }
+
+ static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+ Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = path.getFileSystem(new Configuration())) {
+ if (!fs.exists(path)) {
+ return null;
+ }
+ try (FSDataInputStream fis = fs.open(path)) {
+ String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
+ return PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ }
+ } catch (IOException e) {
+ throw new SamzaException("Failed to read partition description from: " + path);
+ }
+ }
+
+ /*
+ * Persist the partition descriptor only when it doesn't exist already on HDFS.
+ */
+ private void persistPartitionDescriptor(String streamName,
+ Map<Partition, List<String>> partitionDescriptorMap) {
+ Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+ // Partition descriptor is supposed to be immutable. So don't override it if it exists.
+ if (fs.exists(targetPath)) {
+ LOG.warn(targetPath.toString() + " exists. Skip persisting partition descriptor.");
+ } else {
+ LOG.info("About to persist partition descriptors to path: " + targetPath.toString());
+ try (FSDataOutputStream fos = fs.create(targetPath)) {
+ fos.write(
+ PartitionDescriptorUtil.getJsonFromDescriptorMap(partitionDescriptorMap).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ } catch (IOException e) {
+ throw new SamzaException("Failed to validate/persist partition description on hdfs.", e);
+ }
+ }
+
+ private boolean partitionDescriptorExists(String streamName) {
+ Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+ return fs.exists(targetPath);
+ } catch (IOException e) {
+ throw new SamzaException("Failed to obtain information about path: " + targetPath);
+ }
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
+ streamNames.forEach(streamName -> {
+ systemStreamMetadataMap.put(streamName, new SystemStreamMetadata(streamName, directoryPartitioner
+ .getPartitionMetadataMap(streamName, obtainPartitionDescriptorMap(stagingDirectory, streamName))));
+ if (!partitionDescriptorExists(streamName)) {
+ persistPartitionDescriptor(streamName, directoryPartitioner.getPartitionDescriptor(streamName));
+ }
+ });
+ return systemStreamMetadataMap;
+ }
+
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+ throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+ }
+
+ @Override
+ public void validateChangelogStream(String streamName, int numOfPartitions) {
+ throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
+ }
+
+ /**
+ * Compare two multi-file style offset. A multi-file style offset consist of both
+ * the file index as well as the offset within that file. And the format of it is:
+ * "fileIndex:offsetWithinFile"
+ * For example, "2:0", "3:127"
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ *
+ * @param offset1 First offset for comparison.
+ * @param offset2 Second offset for comparison.
+ * @return -1, if offset1 @lt offset2
+ * 0, if offset1 == offset2
+ * 1, if offset1 @gt offset2
+ * null, if not comparable
+ */
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
+ return null;
+ }
+ int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
+ int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
+ if (fileIndex1 == fileIndex2) {
+ String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
+ String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
+ return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
+ }
+ return Integer.compare(fileIndex1, fileIndex2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
new file mode 100644
index 0000000..13a7102
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -0,0 +1,281 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+
+/**
+ * The system consumer for HDFS, extending the {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * Events will be parsed from HDFS and placed into a blocking queue in {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * There will be one {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} for each {@link org.apache.samza.system.SystemStreamPartition},
+ * each {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} is running within its own thread.
+ *
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_1 - Thread1 \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP1-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_2 - Thread2 \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP2-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SystemConsumer.poll() \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 \u2502
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * ... ... \u2502
+ * \u2502
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_N - ThreadN \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSPN-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * Since each thread has only one reader and has its own blocking queue, there are essentially no communication
+ * among reader threads.
+ * Thread safety between reader threads and Samza main thread is guaranteed by the blocking queues stand in the middle.
+ */
+public class HdfsSystemConsumer extends BlockingEnvelopeMap {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
+
+ private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
+
+ private final HdfsReaderFactory.ReaderType readerType;
+ private final String stagingDirectory; // directory that contains the partition description
+ private final int bufferCapacity;
+ private final int numMaxRetires;
+ private ExecutorService executorService;
+
+ /**
+ * The cached map collection from stream partition to partition descriptor. The partition descriptor
+ * is the actual file path (or the set of file paths if the partition contains multiple files)
+ * of the stream partition.
+ * For example,
+ * (stream1) -> (P0) -> "hdfs://user/samzauser/1/datafile01.avro"
+ * (stream1) -> (P1) -> "hdfs://user/samzauser/1/datafile02.avro"
+ * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
+ * ...
+ */
+ private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+ private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+ private Map<SystemStreamPartition, Future> readerRunnableStatus;
+
+ /**
+ * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
+ * to be shutdown. {@link org.apache.samza.system.hdfs.HdfsSystemConsumer.ReaderRunnable} on
+ * each thread will be checking this variable to determine whether it should stop.
+ */
+ private volatile boolean isShutdown;
+
+ private final HdfsSystemConsumerMetrics consumerMetrics;
+ private final HdfsConfig hdfsConfig;
+
+ public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
+ super(consumerMetrics.getMetricsRegistry());
+ hdfsConfig = new HdfsConfig(config);
+ readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+ stagingDirectory = hdfsConfig.getStagingDirectory();
+ bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
+ numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
+ readers = new ConcurrentHashMap<>();
+ readerRunnableStatus = new ConcurrentHashMap<>();
+ isShutdown = false;
+ this.consumerMetrics = consumerMetrics;
+ cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
+ @Override
+ public Map<Partition, List<String>> load(String streamName)
+ throws Exception {
+ Validate.notEmpty(streamName);
+ return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start() {
+ LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
+ executorService = Executors.newCachedThreadPool();
+ readers.entrySet().forEach(
+ entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ LOG.info("Received request to stop HdfsSystemConsumer.");
+ isShutdown = true;
+ executorService.shutdown();
+ LOG.info("HdfsSystemConsumer stopped.");
+ }
+
+ private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
+ String streamName = systemStreamPartition.getStream();
+ Partition partition = systemStreamPartition.getPartition();
+ try {
+ return cachedPartitionDescriptorMap.get(streamName).get(partition);
+ } catch (ExecutionException e) {
+ throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, e);
+ }
+ }
+
+ @Override
+ protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+ return new LinkedBlockingQueue<>(bufferCapacity);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + offset);
+ super.register(systemStreamPartition, offset);
+ MultiFileHdfsReader reader =
+ new MultiFileHdfsReader(readerType, systemStreamPartition, getPartitionDescriptor(systemStreamPartition), offset,
+ numMaxRetires);
+ readers.put(systemStreamPartition, reader);
+ consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ throws InterruptedException {
+ systemStreamPartitions.forEach(systemStreamPartition -> {
+ Future status = readerRunnableStatus.get(systemStreamPartition);
+ if (status.isDone()) {
+ try {
+ status.get();
+ } catch (ExecutionException | InterruptedException e) {
+ MultiFileHdfsReader reader = readers.get(systemStreamPartition);
+ LOG.warn(
+ String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
+ e);
+ reader.reconnect();
+ readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
+ }
+ }
+ });
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+ try {
+ super.put(systemStreamPartition, envelope);
+ } catch (InterruptedException e) {
+ throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
+ }
+ }
+
+ private void doPoll(MultiFileHdfsReader reader) {
+ SystemStreamPartition systemStreamPartition = reader.getSystemStreamPartition();
+ while (reader.hasNext() && !isShutdown) {
+ IncomingMessageEnvelope messageEnvelope = reader.readNext();
+ offerMessage(systemStreamPartition, messageEnvelope);
+ consumerMetrics.incNumEvents(systemStreamPartition);
+ consumerMetrics.incTotalNumEvents();
+ }
+ offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
+ reader.close();
+ }
+
+ public static class HdfsSystemConsumerMetrics {
+
+ private final MetricsRegistry metricsRegistry;
+ private final Map<SystemStreamPartition, Counter> numEventsCounterMap;
+ private final Counter numTotalEventsCounter;
+
+ public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
+ this.metricsRegistry = metricsRegistry;
+ this.numEventsCounterMap = new ConcurrentHashMap<>();
+ this.numTotalEventsCounter = metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-total-events");
+ }
+
+ public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
+ numEventsCounterMap.putIfAbsent(systemStreamPartition,
+ metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
+ }
+
+ public void incNumEvents(SystemStreamPartition systemStreamPartition) {
+ if (!numEventsCounterMap.containsKey(systemStreamPartition)) {
+ registerSystemStreamPartition(systemStreamPartition);
+ }
+ numEventsCounterMap.get(systemStreamPartition).inc();
+ }
+
+ public void incTotalNumEvents() {
+ numTotalEventsCounter.inc();
+ }
+
+ public MetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+ }
+
+ private class ReaderRunnable implements Runnable {
+ public MultiFileHdfsReader reader;
+
+ public ReaderRunnable(MultiFileHdfsReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+ doPoll(reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
new file mode 100644
index 0000000..5abdbbc
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+/**
+ * Util class for methods around partition descriptor.
+ *
+ * Partition descriptor is rich information about a partition: the set
+ * of files�that are associated with the�partition.
+ *
+ * Partition descriptor map, or descriptor map, is the map between the
+ * {@link org.apache.samza.Partition} and the descriptor
+ */
+public class PartitionDescriptorUtil {
+
+ private PartitionDescriptorUtil() {
+
+ }
+
+ private static final int INDENT_FACTOR = 2;
+ private static final String DELIMITER = ",";
+
+ private static String getStringFromPaths(List<String> paths) {
+ return String.join(DELIMITER, paths);
+ }
+
+ private static List<String> getPathsFromString(String descriptor) {
+ return Arrays.asList(descriptor.split(DELIMITER));
+ }
+
+ public static String getJsonFromDescriptorMap(Map<Partition, List<String>> descriptorMap) {
+ JSONObject out = new JSONObject();
+ descriptorMap.forEach((partition, paths) -> {
+ String descriptorStr = getStringFromPaths(paths);
+ try {
+ out.put(String.valueOf(partition.getPartitionId()), descriptorStr);
+ } catch (JSONException e) {
+ throw new SamzaException(
+ String.format("Invalid description to encode. partition=%s, descriptor=%s", partition, descriptorStr), e);
+ }
+ });
+ try {
+ return out.toString(INDENT_FACTOR);
+ } catch (JSONException e) {
+ throw new SamzaException("Failed to generate json string.", e);
+ }
+ }
+
+ public static Map<Partition, List<String>> getDescriptorMapFromJson(String json) {
+ try {
+ @SuppressWarnings("unchecked")
+ Map<String, String> rawMap = new ObjectMapper().readValue(json, HashMap.class);
+ Map<Partition, List<String>> descriptorMap = new HashMap<>();
+ rawMap.forEach((key, value) -> descriptorMap.put(new Partition(Integer.valueOf(key)), getPathsFromString(value)));
+ return descriptorMap;
+ } catch (IOException | NumberFormatException e) {
+ throw new SamzaException("Failed to convert json: " + json, e);
+ }
+ }
+
+ public static Path getPartitionDescriptorPath(String base, String streamName) {
+ Path basePath = new Path(base);
+ Path relativePath = new Path(streamName.replaceAll("\\W", "_") + "_partition_description");
+ return new Path(basePath, relativePath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
new file mode 100644
index 0000000..5cad1e4
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -0,0 +1,235 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+/**
+ * The partitioner that takes a directory as an input and does
+ * 1. Filtering, based on a white list and a black list
+ * 2. Grouping, based on grouping pattern
+ *
+ * And then generate the partition metadata and partition descriptors
+ *
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+public class DirectoryPartitioner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
+ private static final String GROUP_IDENTIFIER = "\\[id]";
+
+ private String whiteListRegex;
+ private String blackListRegex;
+ private String groupPattern;
+ private FileSystemAdapter fileSystemAdapter;
+
+ // stream name => partition => partition descriptor
+ private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+
+ public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
+ FileSystemAdapter fileSystemAdapter) {
+ this.whiteListRegex = whiteList;
+ this.blackListRegex = blackList;
+ this.groupPattern = groupPattern;
+ this.fileSystemAdapter = fileSystemAdapter;
+ LOG.info(String
+ .format("Creating DirectoryPartitioner with whiteList=%s, blackList=%s, groupPattern=%s", whiteList, blackList,
+ groupPattern));
+ }
+
+ /*
+ * Based on the stream name, get the list of all files and filter out unneeded ones given
+ * the white list and black list
+ */
+ private List<FileMetadata> getFilteredFiles(String streamName) {
+ List<FileMetadata> filteredFiles = new ArrayList<>();
+ List<FileMetadata> allFiles = fileSystemAdapter.getAllFiles(streamName);
+ LOG.info(String.format("List of all files for %s: %s", streamName, allFiles));
+ allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
+ .forEach(filteredFiles::add);
+ // sort the files to have a consistent order
+ filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+ LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
+ return filteredFiles;
+ }
+
+ /*
+ * Algorithm to extract the group identifier from the path based on
+ * the group pattern.
+ * 1. Split the group pattern into two parts (prefix, suffix)
+ * 2. Match the both prefix and suffix against the input
+ * 3. Strip the prefix and suffix and then we can get the group identifier
+ *
+ * For example,
+ * input = run_2016-08-01-part-3.avro
+ * group pattern = ".*part-[id]/.avro"
+ *
+ * 1. Split: prefix pattern = ".*part-" suffix pattern = "/.avro"
+ * 2. Match: prefix string = "run_2016-08-01-part-" suffix string = ".avro"
+ * 3. Extract: output = "3"
+ *
+ * If we can't extract a group identifier, return the original input
+ */
+ private String extractGroupIdentifier(String input) {
+ if (StringUtils.isBlank(GROUP_IDENTIFIER)) {
+ return input;
+ }
+ String[] patterns = groupPattern.split(GROUP_IDENTIFIER);
+ if (patterns.length != 2) {
+ return input;
+ }
+
+ Pattern p1 = Pattern.compile(patterns[0]);
+ Pattern p2 = Pattern.compile(patterns[1]);
+ Matcher m1 = p1.matcher(input);
+ Matcher m2 = p2.matcher(input);
+ if (!m1.find()) {
+ return input;
+ }
+ int s1 = m1.end();
+ if (!m2.find(s1)) {
+ return input;
+ }
+ int s2 = m2.start();
+ return input.substring(s1, s2);
+ }
+
+ /*
+ * Group partitions based on the group identifier extracted from the file path
+ */
+ private List<List<FileMetadata>> generatePartitionGroups(List<FileMetadata> filteredFiles) {
+ Map<String, List<FileMetadata>> map = new HashMap<>();
+ for (FileMetadata fileMetadata : filteredFiles) {
+ String groupId = extractGroupIdentifier(fileMetadata.getPath());
+ map.putIfAbsent(groupId, new ArrayList<>());
+ map.get(groupId).add(fileMetadata);
+ }
+ List<List<FileMetadata>> ret = new ArrayList<>();
+ // sort the map to guarantee consistent ordering
+ List<String> sortedKeys = new ArrayList<>(map.keySet());
+ sortedKeys.sort(Comparator.<String>naturalOrder());
+ sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
+ return ret;
+ }
+
+ /*
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+ private List<FileMetadata> validateAndGetOriginalFilteredFiles(List<FileMetadata> newFileList,
+ Map<Partition, List<String>> existingPartitionDescriptor) {
+ assert newFileList != null;
+ assert existingPartitionDescriptor != null;
+ Set<String> oldFileSet = new HashSet<>();
+ existingPartitionDescriptor.values().forEach(oldFileSet::addAll);
+ Set<String> newFileSet = new HashSet<>();
+ newFileList.forEach(file -> newFileSet.add(file.getPath()));
+ if (!newFileSet.containsAll(oldFileSet)) {
+ throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+ + oldFileSet.removeAll(newFileSet));
+ }
+ Iterator<FileMetadata> iterator = newFileList.iterator();
+ while (iterator.hasNext()) {
+ FileMetadata file = iterator.next();
+ if (!oldFileSet.contains(file.getPath())) {
+ iterator.remove();
+ }
+ }
+ return newFileList;
+ }
+
+ /**
+ * Get partition metadata for a stream
+ * @param streamName name of the stream; should contain the information about the path of the
+ * root directory
+ * @param existingPartitionDescriptorMap map of the existing partition descriptor
+ * @return map of SSP metadata
+ */
+ public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
+ @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
+ LOG.info("Trying to obtain metadata for " + streamName);
+ LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+ : existingPartitionDescriptorMap));
+ Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
+ partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
+ List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
+ if (existingPartitionDescriptorMap != null) {
+ filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
+ }
+ List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
+ int partitionId = 0;
+ for (List<FileMetadata> fileGroup : groupedPartitions) {
+ Partition partition = new Partition(partitionId);
+ List<String> pathList = new ArrayList<>();
+ List<String> lengthList = new ArrayList<>();
+ fileGroup.forEach(fileMetadata -> {
+ pathList.add(fileMetadata.getPath());
+ lengthList.add(String.valueOf(fileMetadata.getLen()));
+ });
+ String oldestOffset = MultiFileHdfsReader.generateOffset(0, "0");
+ String newestOffset = MultiFileHdfsReader.generateOffset(lengthList.size() - 1, String.valueOf(lengthList.get(lengthList.size() - 1)));
+ SystemStreamPartitionMetadata metadata =
+ new SystemStreamPartitionMetadata(oldestOffset, newestOffset, null);
+ partitionMetadataMap.put(partition, metadata);
+ partitionDescriptorMap.get(streamName).put(partition, pathList);
+ partitionId++;
+ }
+ LOG.info("Obtained metadata map as: " + partitionMetadataMap);
+ LOG.info("Computed partition description as: " + partitionDescriptorMap);
+ return partitionMetadataMap;
+ }
+
+ /**
+ * Get partition descriptors for a stream
+ * @param streamName name of the stream; should contain the information about the path of the
+ * root directory
+ * @return map of the partition descriptor
+ */
+ public Map<Partition, List<String>> getPartitionDescriptor(String streamName) {
+ return partitionDescriptorMap.get(streamName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
new file mode 100644
index 0000000..5fec4bf
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.List;
+
+
+/**
+ * An adapter between directory partitioner and the actual file systems or
+ * file system like systems.
+ */
+public interface FileSystemAdapter {
+
+ /**
+ * Return the list of all files given the stream name
+ * @param streamName name of the stream
+ * @return list of <code>FileMetadata</code> for all files associated to the given stream
+ */
+ public List<FileMetadata> getAllFiles(String streamName);
+
+ public class FileMetadata {
+ private String path;
+ private long length;
+
+ public FileMetadata(String path, long length) {
+ this.path = path;
+ this.length = length;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getLen() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[path = %s, length = %s]", path, length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
new file mode 100644
index 0000000..bb7b3fa
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HdfsFileSystemAdapter implements FileSystemAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsFileSystemAdapter.class);
+
+ public List<FileMetadata> getAllFiles(String streamName) {
+ List<FileMetadata> ret = new ArrayList<>();
+ try {
+ Path streamPath = new Path(streamName);
+ FileSystem fileSystem = streamPath.getFileSystem(new Configuration());
+ FileStatus[] fileStatuses = fileSystem.listStatus(streamPath);
+ for (FileStatus fileStatus : fileStatuses) {
+ ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get the list of files for " + streamName, e);
+ throw new SamzaException(e);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
new file mode 100644
index 0000000..757b40b
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
@@ -0,0 +1,216 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.AvroFSInput;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the HdfsReader that reads and processes avro format
+ * files.
+ */
+public class AvroFileHdfsReader implements SingleFileHdfsReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroFileHdfsReader.class);
+
+ private final SystemStreamPartition systemStreamPartition;
+ private DataFileReader<GenericRecord> fileReader;
+ private long curBlockStart;
+ private long curRecordOffset;
+
+ public AvroFileHdfsReader(SystemStreamPartition systemStreamPartition) {
+ this.systemStreamPartition = systemStreamPartition;
+ this.fileReader = null;
+ }
+
+ @Override
+ public void open(String pathStr, String singleFileOffset) {
+ LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset));
+ Path path = new Path(pathStr);
+ try {
+ AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path);
+ fileReader = new DataFileReader<>(input, new GenericDatumReader<>());
+ seek(singleFileOffset);
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public void seek(String singleFileOffset) {
+ try {
+ // See comments for AvroFileCheckpoint to understand the behavior below
+ AvroFileCheckpoint checkpoint = new AvroFileCheckpoint(singleFileOffset);
+ if (checkpoint.isStartingOffset()) {
+ // seek to the beginning of the first block
+ fileReader.sync(0);
+ curBlockStart = fileReader.previousSync();
+ curRecordOffset = 0;
+ return;
+ }
+ fileReader.seek(checkpoint.getBlockStart());
+ for (int i = 0; i < checkpoint.getRecordOffset(); i++) {
+ if (fileReader.hasNext()) {
+ fileReader.next();
+ }
+ }
+ curBlockStart = checkpoint.getBlockStart();
+ curRecordOffset = checkpoint.getRecordOffset();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public IncomingMessageEnvelope readNext() {
+ // get checkpoint for THIS record
+ String checkpoint = nextOffset();
+ GenericRecord record = fileReader.next();
+ if (fileReader.previousSync() != curBlockStart) {
+ curBlockStart = fileReader.previousSync();
+ curRecordOffset = 0;
+ } else {
+ curRecordOffset++;
+ }
+ // avro schema doesn't necessarily have key field
+ return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return fileReader.hasNext();
+ }
+
+ @Override
+ public void close() {
+ LOG.info("About to close file reader for " + systemStreamPartition);
+ try {
+ fileReader.close();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ LOG.info("File reader closed for " + systemStreamPartition);
+ }
+
+ @Override
+ public String nextOffset() {
+ return AvroFileCheckpoint.generateCheckpointStr(curBlockStart, curRecordOffset);
+ }
+
+ public static int offsetComparator(String offset1, String offset2) {
+ AvroFileCheckpoint cp1 = new AvroFileCheckpoint(offset1);
+ AvroFileCheckpoint cp2 = new AvroFileCheckpoint(offset2);
+ return cp1.compareTo(cp2);
+ }
+
+ /**
+ * An avro file looks something like this:
+ *
+ * Byte offset: 0 103 271 391
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * Avro file: \u2502 Header \u2502 Block 1 \u2502 Block 2 \u2502 Block 3 \u2502 ...
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *
+ * Each block contains multiple records. The start of a block is defined as a valid
+ * synchronization point. A file reader can only seek to a synchronization point, i.e.
+ * the start of blocks. Thus, to precisely describe the location of a record, we need
+ * to use the pair (blockStart, recordOffset). Here "blockStart" means the start of the
+ * block and "recordOffset" means the index of the record within the block.
+ * Take the example above, and suppose block 1 has 4 records, we have record sequences as:
+ * (103, 0), (103, 1), (103, 2), (103, 3), (271, 0), ...
+ * where (271, 0) represents the first event in block 2
+ *
+ * With the CP_DELIM being '@', the actual checkpoint string would look like "103@1",
+ * "271@0" or "271", etc. For convenience, a checkpoint with only the blockStart but no
+ * recordOffset within the block simply means the first record in that block. Thus,
+ * "271@0" is equal to "271".
+ */
+ public static class AvroFileCheckpoint {
+ private static final String CP_DELIM = "@";
+ private long blockStart; // start position of the block
+ private long recordOffset; // record offset within the block
+ String checkpointStr;
+
+ public static String generateCheckpointStr(long blockStart, long recordOffset) {
+ return blockStart + CP_DELIM + recordOffset;
+ }
+
+ public AvroFileCheckpoint(String checkpointStr) {
+ String[] elements = checkpointStr.replaceAll("\\s", "").split(CP_DELIM);
+ if (elements.length > 2 || elements.length < 1) {
+ throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr);
+ }
+ try {
+ blockStart = Long.parseLong(elements[0]);
+ recordOffset = elements.length == 2 ? Long.parseLong(elements[1]) : 0;
+ } catch (NumberFormatException e) {
+ throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr, e);
+ }
+ this.checkpointStr = checkpointStr;
+ }
+
+ public AvroFileCheckpoint(long blockStart, long recordOffset) {
+ this.blockStart = blockStart;
+ this.recordOffset = recordOffset;
+ this.checkpointStr = generateCheckpointStr(blockStart, recordOffset);
+ }
+
+ public long getBlockStart() {
+ return blockStart;
+ }
+
+ public long getRecordOffset() {
+ return recordOffset;
+ }
+
+ public String getCheckpointStr() {
+ return checkpointStr;
+ }
+
+ public boolean isStartingOffset() {
+ return blockStart == 0;
+ }
+
+ public int compareTo(AvroFileCheckpoint other) {
+ if (this.blockStart < other.blockStart) {
+ return -1;
+ } else if (this.blockStart > other.blockStart) {
+ return 1;
+ } else return Long.compare(this.recordOffset, other.recordOffset);
+ }
+
+ @Override
+ public String toString() {
+ return getCheckpointStr();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
new file mode 100644
index 0000000..4efdfd7
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class HdfsReaderFactory {
+ public static SingleFileHdfsReader getHdfsReader(ReaderType readerType, SystemStreamPartition systemStreamPartition) {
+ switch (readerType) {
+ case AVRO: return new AvroFileHdfsReader(systemStreamPartition);
+ default:
+ throw new SamzaException("Unsupported reader type: " + readerType);
+ }
+ }
+
+ public static ReaderType getType(String readerTypeStr) {
+ try {
+ return ReaderType.valueOf(readerTypeStr.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new SamzaException("Invalid hdfs reader type string: " + readerTypeStr, e);
+ }
+ }
+
+ public static int offsetComparator(ReaderType readerType, String offset1, String offset2) {
+ switch (readerType) {
+ case AVRO: return AvroFileHdfsReader.offsetComparator(offset1, offset2);
+ default:
+ throw new SamzaException("Unsupported reader type: " + readerType);
+ }
+ }
+
+ /*
+ * Support AVRO only so far. Implement <code>SingleFileHdfsReader</code> to support a variety of
+ * file parsers. Can easily support "plain" text in the future (each line of the
+ * text representing a record for example)
+ */
+ public enum ReaderType {
+ AVRO
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
new file mode 100644
index 0000000..7870713
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A wrapper on top of {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader}
+ * to manage the situation of multiple files per partition.
+ *
+ * The offset for MultiFileHdfsReader, which is also the offset that gets
+ * committed in and used by Samza, consists of two parts: file index,
+ * actual offset within file. For example, 3:127
+ *
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ */
+public class MultiFileHdfsReader {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiFileHdfsReader.class);
+ private static final String DELIMITER = ":";
+
+ private final HdfsReaderFactory.ReaderType readerType;
+ private final SystemStreamPartition systemStreamPartition;
+ private List<String> filePaths;
+ private SingleFileHdfsReader curReader;
+ private int curFileIndex = 0;
+ private String curSingleFileOffset;
+ private int numRetries;
+ private int numMaxRetries;
+
+ /**
+ * Get the current file index from the offset string
+ * @param offset offset string that contains both file index and offset within file
+ * @return the file index part
+ */
+ public static int getCurFileIndex(String offset) {
+ String[] elements = offset.split(DELIMITER);
+ if (elements.length < 2) {
+ throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+ }
+ return Integer.parseInt(elements[0]);
+ }
+
+ /**
+ * Get the offset within file from the offset string
+ * @param offset offset string that contains both file index and offset within file
+ * @return the single file offset part
+ */
+ public static String getCurSingleFileOffset(String offset) {
+ String[] elements = offset.split(DELIMITER);
+ if (elements.length < 2) {
+ throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+ }
+ // Getting the remaining of the offset string in case the single file
+ // offset uses the same delimiter.
+ return offset.substring(elements[0].length() + 1);
+ }
+
+ /**
+ * Generate the offset based on file index and offset within single file
+ * @param fileIndex index of the file
+ * @param singleFileOffset offset within single file
+ * @return the complete offset
+ */
+ public static String generateOffset(int fileIndex, String singleFileOffset) {
+ return fileIndex + DELIMITER + singleFileOffset;
+ }
+
+ /*
+ * Get current offset: offset of the LAST message being successfully read. If no messages have
+ * ever been read, return the offset of first event.
+ */
+ private String getCurOffset() {
+ return generateOffset(curFileIndex, curSingleFileOffset);
+ }
+
+ public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+ List<String> partitionDescriptors, String offset) {
+ this(readerType, systemStreamPartition, partitionDescriptors, offset,
+ Integer.parseInt(HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT()));
+ }
+
+ private void init(String offset) {
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ }
+ curFileIndex = getCurFileIndex(offset);
+ if (curFileIndex >= filePaths.size()) {
+ throw new SamzaException(
+ String.format("Invalid file index %d. Number of files is %d", curFileIndex, filePaths.size()));
+ }
+ curSingleFileOffset = getCurSingleFileOffset(offset);
+ curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+ curReader.open(filePaths.get(curFileIndex), curSingleFileOffset);
+ }
+
+ public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+ List<String> partitionDescriptors, String offset, int numMaxRetries) {
+ this.readerType = readerType;
+ this.systemStreamPartition = systemStreamPartition;
+ this.filePaths = partitionDescriptors;
+ this.numMaxRetries = numMaxRetries;
+ this.numRetries = 0;
+ if (partitionDescriptors.size() <= 0) {
+ throw new SamzaException(
+ "Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
+ }
+ init(offset);
+ }
+
+ public boolean hasNext() {
+ while (curFileIndex < filePaths.size()) {
+ if (curReader.hasNext()) {
+ return true;
+ }
+ curReader.close();
+ curFileIndex++;
+ if (curFileIndex < filePaths.size()) {
+ curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+ curReader.open(filePaths.get(curFileIndex), "0");
+ }
+ }
+ return false;
+ }
+
+ public IncomingMessageEnvelope readNext() {
+ if (!hasNext()) {
+ LOG.warn("Attempting to read more data when there aren't any. ssp=" + systemStreamPartition);
+ return null;
+ }
+ // record the next offset before we read, so when the read fails and we reconnect,
+ // we seek to the same offset that we try below
+ curSingleFileOffset = curReader.nextOffset();
+ IncomingMessageEnvelope messageEnvelope = curReader.readNext();
+ // Copy everything except for the offset. Turn the single-file style offset into a multi-file one
+ return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
+ messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+ }
+
+ /**
+ * Reconnect to the file systems in case of failure.
+ * Reset offset to the last checkpoint (last successfully read message).
+ * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+ * retries.
+ */
+ public void reconnect() {
+ reconnect(getCurOffset());
+ }
+
+ /**
+ * Reconnect to the file systems in case of failures.
+ * @param offset reset offset to the specified offset
+ * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+ * retries.
+ */
+ public void reconnect(String offset) {
+ if (numRetries >= numMaxRetries) {
+ throw new SamzaException(
+ String.format("Give up reconnecting. numRetries: %d; numMaxRetries: %d", numRetries, numMaxRetries));
+ }
+ LOG.info(String
+ .format("Reconnecting with offset: %s numRetries: %d numMaxRetries: %d", offset, numRetries, numMaxRetries));
+ numRetries++;
+ init(offset);
+ }
+
+ public void close() {
+ LOG.info(String.format("MiltiFileHdfsReader shutdown requested for %s. Current offset = %s", systemStreamPartition,
+ getCurOffset()));
+ if (curReader != null) {
+ curReader.close();
+ }
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return systemStreamPartition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
new file mode 100644
index 0000000..eb8a70d
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+public interface SingleFileHdfsReader {
+ /**
+ * Open the file and seek to specific offset for reading.
+ * @param path path of the file to be read
+ * @param offset offset the reader should start from
+ */
+ public void open(String path, String offset);
+
+ /**
+ * Seek to a specific offset
+ * @param offset offset the reader should seek to
+ */
+ public void seek(String offset);
+
+ /**
+ * Construct and return the next message envelope
+ * @return constructed IncomeMessageEnvelope
+ */
+ public IncomingMessageEnvelope readNext();
+
+ /**
+ * Get the next offset, which is the offset for the next message
+ * that will be returned by readNext
+ * @return next offset
+ */
+ public String nextOffset();
+
+ /**
+ * Whether there are still records to be read
+ * @return true of false based on whether the reader has hit end of file
+ */
+ public boolean hasNext();
+
+ /**
+ * Close the reader.
+ */
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 61b7570..53ff372 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -25,7 +25,7 @@ import java.util.UUID
import org.apache.samza.SamzaException
import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.{Config, ScalaMapConfig}
+import org.apache.samza.config.{YarnConfig, Config, ScalaMapConfig}
import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConversions._
@@ -62,6 +62,34 @@ object HdfsConfig {
val BUCKETER_CLASS = "systems.%s.producer.hdfs.bucketer.class"
val BUCKETER_CLASS_DEFAULT = "org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer"
+ // capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+ val CONSUMER_BUFFER_CAPACITY = "systems.%s.consumer.bufferCapacity"
+ val CONSUMER_BUFFER_CAPACITY_DEFAULT = 10.toString
+
+ // number of max retries for the hdfs consumer readers per partition
+ val CONSUMER_NUM_MAX_RETRIES = "system.%s.consumer.numMaxRetries"
+ val CONSUMER_NUM_MAX_RETRIES_DEFAULT = 10.toString
+
+ // white list used by directory partitioner to filter out unwanted files in a hdfs directory
+ val CONSUMER_PARTITIONER_WHITELIST = "systems.%s.partitioner.defaultPartitioner.whitelist"
+ val CONSUMER_PARTITIONER_WHITELIST_DEFAULT = ".*"
+
+ // black list used by directory partitioner to filter out unwanted files in a hdfs directory
+ val CONSUMER_PARTITIONER_BLACKLIST = "systems.%s.partitioner.defaultPartitioner.blacklist"
+ val CONSUMER_PARTITIONER_BLACKLIST_DEFAULT = ""
+
+ // group pattern used by directory partitioner for advanced partitioning
+ val CONSUMER_PARTITIONER_GROUP_PATTERN = "systems.%s.partitioner.defaultPartitioner.groupPattern"
+ val CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT = ""
+
+ // type of the file reader (avro, plain, etc.)
+ val FILE_READER_TYPE = "systems.%s.consumer.reader"
+ val FILE_READER_TYPE_DEFAULT = "avro"
+
+ // staging directory for storing partition description
+ val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
+ val STAGING_DIRECTORY_DEFAULT = ""
+
implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
}
@@ -130,4 +158,53 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
getOrElse(HdfsConfig.COMPRESSION_TYPE format systemName, HdfsConfig.COMPRESSION_TYPE_DEFAULT)
}
+ /**
+ * Get the capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+ */
+ def getConsumerBufferCapacity(systemName: String): Int = {
+ getOrElse(HdfsConfig.CONSUMER_BUFFER_CAPACITY format systemName, HdfsConfig.CONSUMER_BUFFER_CAPACITY_DEFAULT).toInt
+ }
+
+ /**
+ * Get number of max retries for the hdfs consumer readers per partition
+ */
+ def getConsumerNumMaxRetries(systemName: String): Int = {
+ getOrElse(HdfsConfig.CONSUMER_NUM_MAX_RETRIES format systemName, HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT).toInt
+ }
+
+ /**
+ * White list used by directory partitioner to filter out unwanted files in a hdfs directory
+ */
+ def getPartitionerWhiteList(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_WHITELIST_DEFAULT)
+ }
+
+ /**
+ * Black list used by directory partitioner to filter out unwanted files in a hdfs directory
+ */
+ def getPartitionerBlackList(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST_DEFAULT)
+ }
+
+ /**
+ * Group pattern used by directory partitioner for advanced partitioning
+ */
+ def getPartitionerGroupPattern(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN format systemName, HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT)
+ }
+
+ /**
+ * Get the type of the file reader (avro, plain, etc.)
+ */
+ def getFileReaderType(systemName: String): String = {
+ getOrElse(HdfsConfig.FILE_READER_TYPE format systemName, HdfsConfig.FILE_READER_TYPE_DEFAULT)
+ }
+
+ /**
+ * Staging directory for storing partition description. If not set, will use the staging directory set
+ * by yarn job.
+ */
+ def getStagingDirectory(): String = {
+ getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
deleted file mode 100644
index 92eb447..0000000
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.hdfs
-
-
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{SystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-
-
-class HdfsSystemAdmin extends SystemAdmin with Logging {
-
- def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
- new java.util.HashMap[SystemStreamPartition, String]()
- }
-
- def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
- new java.util.HashMap[String, SystemStreamMetadata]()
- }
-
- def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def createCoordinatorStream(streamName: String) {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def offsetComparator(offset1: String, offset2: String) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index ef3c20a..3673431 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -1,45 +1,45 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
package org.apache.samza.system.hdfs
-import org.apache.samza.SamzaException
-
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.{KafkaUtil,Logging}
+import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
+import org.apache.samza.util.{KafkaUtil, Logging}
class HdfsSystemFactory extends SystemFactory with Logging {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
- throw new SamzaException("HdfsSystemFactory does not implement a consumer")
+ new HdfsSystemConsumer(systemName, config, new HdfsSystemConsumerMetrics(registry))
}
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ // TODO: SAMZA-1026: should remove Kafka dependency below
val clientId = KafkaUtil.getClientId("samza-producer", config)
val metrics = new HdfsSystemProducerMetrics(systemName, registry)
new HdfsSystemProducer(systemName, clientId, config, metrics)
}
def getAdmin(systemName: String, config: Config) = {
- new HdfsSystemAdmin
+ new HdfsSystemAdmin(systemName, config)
}
}