You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/10/05 18:39:44 UTC
[2/2] samza git commit: SAMZA-967: HDFS System Consumer
SAMZA-967: HDFS System Consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2216fe0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2216fe0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2216fe0b
Branch: refs/heads/master
Commit: 2216fe0b7b250e2edfc14d150e59f84884e8cba4
Parents: f4d924f
Author: Hai Lu <lh...@gmail.com>
Authored: Wed Oct 5 11:39:17 2016 -0700
Committer: Xinyu Liu <xi...@apache.org>
Committed: Wed Oct 5 11:39:17 2016 -0700
----------------------------------------------------------------------
build.gradle | 16 +-
.../samza/system/hdfs/HdfsSystemAdmin.java | 221 ++++++++++++++
.../samza/system/hdfs/HdfsSystemConsumer.java | 281 +++++++++++++++++
.../system/hdfs/PartitionDescriptorUtil.java | 97 ++++++
.../hdfs/partitioner/DirectoryPartitioner.java | 235 ++++++++++++++
.../hdfs/partitioner/FileSystemAdapter.java | 60 ++++
.../hdfs/partitioner/HdfsFileSystemAdapter.java | 55 ++++
.../system/hdfs/reader/AvroFileHdfsReader.java | 216 +++++++++++++
.../system/hdfs/reader/HdfsReaderFactory.java | 59 ++++
.../system/hdfs/reader/MultiFileHdfsReader.java | 204 +++++++++++++
.../hdfs/reader/SingleFileHdfsReader.java | 62 ++++
.../apache/samza/system/hdfs/HdfsConfig.scala | 79 ++++-
.../samza/system/hdfs/HdfsSystemAdmin.scala | 52 ----
.../samza/system/hdfs/HdfsSystemFactory.scala | 46 +--
.../system/hdfs/TestHdfsSystemConsumer.java | 136 +++++++++
.../hdfs/TestPartitionDesctiptorUtil.java | 92 ++++++
.../partitioner/TestDirectoryPartitioner.java | 304 +++++++++++++++++++
.../partitioner/TestHdfsFileSystemAdapter.java | 59 ++++
.../hdfs/reader/TestAvroFileHdfsReader.java | 169 +++++++++++
.../hdfs/reader/TestMultiFileHdfsReader.java | 182 +++++++++++
.../src/test/resources/integTest/emptyTestFile | 16 +
.../src/test/resources/partitioner/testfile01 | 16 +
.../src/test/resources/partitioner/testfile02 | 16 +
.../src/test/resources/reader/TestEvent.avsc | 33 ++
.../hdfs/TestHdfsSystemProducerTestSuite.scala | 38 +--
.../samza/job/yarn/YarnContainerRunner.java | 2 +-
.../apache/samza/job/yarn/YarnJobFactory.scala | 9 +-
27 files changed, 2654 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2bea27b..98839f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,8 @@ rat {
'RELEASE.md',
'samza-test/src/main/resources/**',
'samza-hdfs/src/main/resources/**',
- 'samza-hdfs/src/test/resources/**'
+ 'samza-hdfs/src/test/resources/**',
+ 'out/**'
]
}
@@ -333,6 +334,10 @@ project(":samza-yarn_$scalaVersion") {
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
+ compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+ exclude module: 'slf4j-log4j12'
+ exclude module: 'servlet-api'
+ }
compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
@@ -494,10 +499,19 @@ project(":samza-kv-rocksdb_$scalaVersion") {
project(":samza-hdfs_$scalaVersion") {
apply plugin: 'scala'
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
+ sourceSets.main.java.srcDirs = []
+
dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile project(":samza-kafka_$scalaVersion")
+ // currently hdfs system producer/consumer do depend on yarn for two things:
+ // 1. staging directory 2. security
+ // SAMZA-1032 to solve the staging directory dependency
+ compile project(":samza-yarn_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
exclude module: 'slf4j-log4j12'
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
new file mode 100644
index 0000000..8bf31c5
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -0,0 +1,221 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.partitioner.DirectoryPartitioner;
+import org.apache.samza.system.hdfs.partitioner.HdfsFileSystemAdapter;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The HDFS system admin for {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} and
+ * {@link org.apache.samza.system.hdfs.HdfsSystemProducer}
+ *
+ * A high level overview of the HDFS producer/consumer architecture:
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524 HDFS \u2502
+ * \u2502 Obtain \u2502 \u2502
+ * \u2502 Partition \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 Descriptors \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502 \u2502 Filtering/ \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500\u2500\u2510 Grouping \u2514\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 HDFSAvroFileReader \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 Persist \u2502 \u2502 \u2502
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 Partition \u2502 \u2502 \u2502
+ * \u2502 \u2502 Descriptors \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502 \u2502Directory Partitioner\u2502 \u2502 HDFSAvroWriter \u2502
+ * \u2502 \u2502 IFileReader \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 \u2502 HDFSSystemConsumer \u2502 \u2502 HDFSSystemAdmin \u2502 \u2502 HDFSSystemProducer \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6 \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u253c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502
+ * \u2502 HDFSSystemFactory \u2502
+ * \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ */
+public class HdfsSystemAdmin implements SystemAdmin {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class);
+
+ private HdfsConfig hdfsConfig;
+ private DirectoryPartitioner directoryPartitioner;
+ private String stagingDirectory; // directory that contains the partition description
+ private HdfsReaderFactory.ReaderType readerType;
+
+ public HdfsSystemAdmin(String systemName, Config config) {
+ hdfsConfig = new HdfsConfig(config);
+ directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
+ hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName),
+ new HdfsFileSystemAdapter());
+ stagingDirectory = hdfsConfig.getStagingDirectory();
+ readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ /*
+ * To actually get the "after" offset we have to seek to that specific offset in the file and
+ * read that record to figure out the location of next record. This is much more expensive operation
+ * compared to the case in KafkaSystemAdmin.
+ * So simply return the same offsets. This will always incur re-processing but such semantics are legit
+ * in Samza.
+ */
+ return offsets;
+ }
+
+ static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+ Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = path.getFileSystem(new Configuration())) {
+ if (!fs.exists(path)) {
+ return null;
+ }
+ try (FSDataInputStream fis = fs.open(path)) {
+ String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
+ return PartitionDescriptorUtil.getDescriptorMapFromJson(json);
+ }
+ } catch (IOException e) {
+ throw new SamzaException("Failed to read partition description from: " + path);
+ }
+ }
+
+ /*
+ * Persist the partition descriptor only when it doesn't exist already on HDFS.
+ */
+ private void persistPartitionDescriptor(String streamName,
+ Map<Partition, List<String>> partitionDescriptorMap) {
+ Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+ // Partition descriptor is supposed to be immutable. So don't override it if it exists.
+ if (fs.exists(targetPath)) {
+ LOG.warn(targetPath.toString() + " exists. Skip persisting partition descriptor.");
+ } else {
+ LOG.info("About to persist partition descriptors to path: " + targetPath.toString());
+ try (FSDataOutputStream fos = fs.create(targetPath)) {
+ fos.write(
+ PartitionDescriptorUtil.getJsonFromDescriptorMap(partitionDescriptorMap).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ } catch (IOException e) {
+ throw new SamzaException("Failed to validate/persist partition description on hdfs.", e);
+ }
+ }
+
+ private boolean partitionDescriptorExists(String streamName) {
+ Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
+ try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
+ return fs.exists(targetPath);
+ } catch (IOException e) {
+ throw new SamzaException("Failed to obtain information about path: " + targetPath);
+ }
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
+ streamNames.forEach(streamName -> {
+ systemStreamMetadataMap.put(streamName, new SystemStreamMetadata(streamName, directoryPartitioner
+ .getPartitionMetadataMap(streamName, obtainPartitionDescriptorMap(stagingDirectory, streamName))));
+ if (!partitionDescriptorExists(streamName)) {
+ persistPartitionDescriptor(streamName, directoryPartitioner.getPartitionDescriptor(streamName));
+ }
+ });
+ return systemStreamMetadataMap;
+ }
+
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+ throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+ }
+
+ @Override
+ public void validateChangelogStream(String streamName, int numOfPartitions) {
+ throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
+ }
+
+ /**
+ * Compare two multi-file style offset. A multi-file style offset consist of both
+ * the file index as well as the offset within that file. And the format of it is:
+ * "fileIndex:offsetWithinFile"
+ * For example, "2:0", "3:127"
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ *
+ * @param offset1 First offset for comparison.
+ * @param offset2 Second offset for comparison.
+ * @return -1, if offset1 @lt offset2
+ * 0, if offset1 == offset2
+ * 1, if offset1 @gt offset2
+ * null, if not comparable
+ */
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
+ return null;
+ }
+ int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
+ int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
+ if (fileIndex1 == fileIndex2) {
+ String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1);
+ String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2);
+ return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2);
+ }
+ return Integer.compare(fileIndex1, fileIndex2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
new file mode 100644
index 0000000..13a7102
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -0,0 +1,281 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+
+/**
+ * The system consumer for HDFS, extending the {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * Events will be parsed from HDFS and placed into a blocking queue in {@link org.apache.samza.util.BlockingEnvelopeMap}.
+ * There will be one {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} for each {@link org.apache.samza.system.SystemStreamPartition},
+ * each {@link org.apache.samza.system.hdfs.reader.MultiFileHdfsReader} is running within its own thread.
+ *
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_1 - Thread1 \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP1-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_2 - Thread2 \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSP2-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2524 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * \u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SystemConsumer.poll() \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 \u2502
+ * \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * ... ... \u2502
+ * \u2502
+ * \u2502
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502
+ * \u2502 \u2502 \u2502 \u2502 \u2502
+ * \u2502 MultiFileHdfsReader_N - ThreadN \u2502\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b6\u2502 SSPN-BlockingQueue \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * \u2502 \u2502 \u2502 \u2502
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ * Since each thread has only one reader and has its own blocking queue, there are essentially no communication
+ * among reader threads.
+ * Thread safety between reader threads and Samza main thread is guaranteed by the blocking queues stand in the middle.
+ */
+public class HdfsSystemConsumer extends BlockingEnvelopeMap {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
+
+ private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
+
+ private final HdfsReaderFactory.ReaderType readerType;
+ private final String stagingDirectory; // directory that contains the partition description
+ private final int bufferCapacity;
+ private final int numMaxRetires;
+ private ExecutorService executorService;
+
+ /**
+ * The cached map collection from stream partition to partition descriptor. The partition descriptor
+ * is the actual file path (or the set of file paths if the partition contains multiple files)
+ * of the stream partition.
+ * For example,
+ * (stream1) -> (P0) -> "hdfs://user/samzauser/1/datafile01.avro"
+ * (stream1) -> (P1) -> "hdfs://user/samzauser/1/datafile02.avro"
+ * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro"
+ * ...
+ */
+ private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
+ private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
+ private Map<SystemStreamPartition, Future> readerRunnableStatus;
+
+ /**
+ * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified
+ * to be shutdown. {@link org.apache.samza.system.hdfs.HdfsSystemConsumer.ReaderRunnable} on
+ * each thread will be checking this variable to determine whether it should stop.
+ */
+ private volatile boolean isShutdown;
+
+ private final HdfsSystemConsumerMetrics consumerMetrics;
+ private final HdfsConfig hdfsConfig;
+
+ public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
+ super(consumerMetrics.getMetricsRegistry());
+ hdfsConfig = new HdfsConfig(config);
+ readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
+ stagingDirectory = hdfsConfig.getStagingDirectory();
+ bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
+ numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
+ readers = new ConcurrentHashMap<>();
+ readerRunnableStatus = new ConcurrentHashMap<>();
+ isShutdown = false;
+ this.consumerMetrics = consumerMetrics;
+ cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
+ @Override
+ public Map<Partition, List<String>> load(String streamName)
+ throws Exception {
+ Validate.notEmpty(streamName);
+ return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start() {
+ LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size()));
+ executorService = Executors.newCachedThreadPool();
+ readers.entrySet().forEach(
+ entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue()))));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ LOG.info("Received request to stop HdfsSystemConsumer.");
+ isShutdown = true;
+ executorService.shutdown();
+ LOG.info("HdfsSystemConsumer stopped.");
+ }
+
+ private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
+ String streamName = systemStreamPartition.getStream();
+ Partition partition = systemStreamPartition.getPartition();
+ try {
+ return cachedPartitionDescriptorMap.get(streamName).get(partition);
+ } catch (ExecutionException e) {
+ throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, e);
+ }
+ }
+
+ @Override
+ protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+ return new LinkedBlockingQueue<>(bufferCapacity);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + offset);
+ super.register(systemStreamPartition, offset);
+ MultiFileHdfsReader reader =
+ new MultiFileHdfsReader(readerType, systemStreamPartition, getPartitionDescriptor(systemStreamPartition), offset,
+ numMaxRetires);
+ readers.put(systemStreamPartition, reader);
+ consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ throws InterruptedException {
+ systemStreamPartitions.forEach(systemStreamPartition -> {
+ Future status = readerRunnableStatus.get(systemStreamPartition);
+ if (status.isDone()) {
+ try {
+ status.get();
+ } catch (ExecutionException | InterruptedException e) {
+ MultiFileHdfsReader reader = readers.get(systemStreamPartition);
+ LOG.warn(
+ String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition),
+ e);
+ reader.reconnect();
+ readerRunnableStatus.put(systemStreamPartition, executorService.submit(new ReaderRunnable(reader)));
+ }
+ }
+ });
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+ try {
+ super.put(systemStreamPartition, envelope);
+ } catch (InterruptedException e) {
+ throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
+ }
+ }
+
+ private void doPoll(MultiFileHdfsReader reader) {
+ SystemStreamPartition systemStreamPartition = reader.getSystemStreamPartition();
+ while (reader.hasNext() && !isShutdown) {
+ IncomingMessageEnvelope messageEnvelope = reader.readNext();
+ offerMessage(systemStreamPartition, messageEnvelope);
+ consumerMetrics.incNumEvents(systemStreamPartition);
+ consumerMetrics.incTotalNumEvents();
+ }
+ offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
+ reader.close();
+ }
+
+ public static class HdfsSystemConsumerMetrics {
+
+ private final MetricsRegistry metricsRegistry;
+ private final Map<SystemStreamPartition, Counter> numEventsCounterMap;
+ private final Counter numTotalEventsCounter;
+
+ public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
+ this.metricsRegistry = metricsRegistry;
+ this.numEventsCounterMap = new ConcurrentHashMap<>();
+ this.numTotalEventsCounter = metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-total-events");
+ }
+
+ public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
+ numEventsCounterMap.putIfAbsent(systemStreamPartition,
+ metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
+ }
+
+ public void incNumEvents(SystemStreamPartition systemStreamPartition) {
+ if (!numEventsCounterMap.containsKey(systemStreamPartition)) {
+ registerSystemStreamPartition(systemStreamPartition);
+ }
+ numEventsCounterMap.get(systemStreamPartition).inc();
+ }
+
+ public void incTotalNumEvents() {
+ numTotalEventsCounter.inc();
+ }
+
+ public MetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+ }
+
+ private class ReaderRunnable implements Runnable {
+ public MultiFileHdfsReader reader;
+
+ public ReaderRunnable(MultiFileHdfsReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+ doPoll(reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
new file mode 100644
index 0000000..5abdbbc
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptorUtil.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+/**
+ * Util class for methods around partition descriptor.
+ *
+ * Partition descriptor is rich information about a partition: the set
+ * of files�that are associated with the�partition.
+ *
+ * Partition descriptor map, or descriptor map, is the map between the
+ * {@link org.apache.samza.Partition} and the descriptor
+ */
+public class PartitionDescriptorUtil {
+
+ private PartitionDescriptorUtil() {
+
+ }
+
+ private static final int INDENT_FACTOR = 2;
+ private static final String DELIMITER = ",";
+
+ private static String getStringFromPaths(List<String> paths) {
+ return String.join(DELIMITER, paths);
+ }
+
+ private static List<String> getPathsFromString(String descriptor) {
+ return Arrays.asList(descriptor.split(DELIMITER));
+ }
+
+ public static String getJsonFromDescriptorMap(Map<Partition, List<String>> descriptorMap) {
+ JSONObject out = new JSONObject();
+ descriptorMap.forEach((partition, paths) -> {
+ String descriptorStr = getStringFromPaths(paths);
+ try {
+ out.put(String.valueOf(partition.getPartitionId()), descriptorStr);
+ } catch (JSONException e) {
+ throw new SamzaException(
+ String.format("Invalid description to encode. partition=%s, descriptor=%s", partition, descriptorStr), e);
+ }
+ });
+ try {
+ return out.toString(INDENT_FACTOR);
+ } catch (JSONException e) {
+ throw new SamzaException("Failed to generate json string.", e);
+ }
+ }
+
+ public static Map<Partition, List<String>> getDescriptorMapFromJson(String json) {
+ try {
+ @SuppressWarnings("unchecked")
+ Map<String, String> rawMap = new ObjectMapper().readValue(json, HashMap.class);
+ Map<Partition, List<String>> descriptorMap = new HashMap<>();
+ rawMap.forEach((key, value) -> descriptorMap.put(new Partition(Integer.valueOf(key)), getPathsFromString(value)));
+ return descriptorMap;
+ } catch (IOException | NumberFormatException e) {
+ throw new SamzaException("Failed to convert json: " + json, e);
+ }
+ }
+
+ public static Path getPartitionDescriptorPath(String base, String streamName) {
+ Path basePath = new Path(base);
+ Path relativePath = new Path(streamName.replaceAll("\\W", "_") + "_partition_description");
+ return new Path(basePath, relativePath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
new file mode 100644
index 0000000..5cad1e4
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
@@ -0,0 +1,235 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
+
+
+/**
+ * The partitioner that takes a directory as an input and does
+ * 1. Filtering, based on a white list and a black list
+ * 2. Grouping, based on grouping pattern
+ *
+ * And then generate the partition metadata and partition descriptors
+ *
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+public class DirectoryPartitioner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
+ private static final String GROUP_IDENTIFIER = "\\[id]";
+
+ private String whiteListRegex;
+ private String blackListRegex;
+ private String groupPattern;
+ private FileSystemAdapter fileSystemAdapter;
+
+ // stream name => partition => partition descriptor
+ private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
+
+ public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
+ FileSystemAdapter fileSystemAdapter) {
+ this.whiteListRegex = whiteList;
+ this.blackListRegex = blackList;
+ this.groupPattern = groupPattern;
+ this.fileSystemAdapter = fileSystemAdapter;
+ LOG.info(String
+ .format("Creating DirectoryPartitioner with whiteList=%s, blackList=%s, groupPattern=%s", whiteList, blackList,
+ groupPattern));
+ }
+
+ /*
+ * Based on the stream name, get the list of all files and filter out unneeded ones given
+ * the white list and black list
+ */
+ private List<FileMetadata> getFilteredFiles(String streamName) {
+ List<FileMetadata> filteredFiles = new ArrayList<>();
+ List<FileMetadata> allFiles = fileSystemAdapter.getAllFiles(streamName);
+ LOG.info(String.format("List of all files for %s: %s", streamName, allFiles));
+ allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
+ .forEach(filteredFiles::add);
+ // sort the files to have a consistent order
+ filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath()));
+ LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
+ return filteredFiles;
+ }
+
+ /*
+ * Algorithm to extract the group identifier from the path based on
+ * the group pattern.
+ * 1. Split the group pattern into two parts (prefix, suffix)
+ * 2. Match the both prefix and suffix against the input
+ * 3. Strip the prefix and suffix and then we can get the group identifier
+ *
+ * For example,
+ * input = run_2016-08-01-part-3.avro
+ * group pattern = ".*part-[id]/.avro"
+ *
+ * 1. Split: prefix pattern = ".*part-" suffix pattern = "/.avro"
+ * 2. Match: prefix string = "run_2016-08-01-part-" suffix string = ".avro"
+ * 3. Extract: output = "3"
+ *
+ * If we can't extract a group identifier, return the original input
+ */
+ private String extractGroupIdentifier(String input) {
+ if (StringUtils.isBlank(GROUP_IDENTIFIER)) {
+ return input;
+ }
+ String[] patterns = groupPattern.split(GROUP_IDENTIFIER);
+ if (patterns.length != 2) {
+ return input;
+ }
+
+ Pattern p1 = Pattern.compile(patterns[0]);
+ Pattern p2 = Pattern.compile(patterns[1]);
+ Matcher m1 = p1.matcher(input);
+ Matcher m2 = p2.matcher(input);
+ if (!m1.find()) {
+ return input;
+ }
+ int s1 = m1.end();
+ if (!m2.find(s1)) {
+ return input;
+ }
+ int s2 = m2.start();
+ return input.substring(s1, s2);
+ }
+
+ /*
+ * Group partitions based on the group identifier extracted from the file path
+ */
+ private List<List<FileMetadata>> generatePartitionGroups(List<FileMetadata> filteredFiles) {
+ Map<String, List<FileMetadata>> map = new HashMap<>();
+ for (FileMetadata fileMetadata : filteredFiles) {
+ String groupId = extractGroupIdentifier(fileMetadata.getPath());
+ map.putIfAbsent(groupId, new ArrayList<>());
+ map.get(groupId).add(fileMetadata);
+ }
+ List<List<FileMetadata>> ret = new ArrayList<>();
+ // sort the map to guarantee consistent ordering
+ List<String> sortedKeys = new ArrayList<>(map.keySet());
+ sortedKeys.sort(Comparator.<String>naturalOrder());
+ sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
+ return ret;
+ }
+
+ /*
+ * This class holds the assumption that the directory remains immutable.
+ * If the directory does changes:
+ * ignore new files showing up in the directory based on an old version of partition descriptor;
+ * throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
+ */
+ private List<FileMetadata> validateAndGetOriginalFilteredFiles(List<FileMetadata> newFileList,
+ Map<Partition, List<String>> existingPartitionDescriptor) {
+ assert newFileList != null;
+ assert existingPartitionDescriptor != null;
+ Set<String> oldFileSet = new HashSet<>();
+ existingPartitionDescriptor.values().forEach(oldFileSet::addAll);
+ Set<String> newFileSet = new HashSet<>();
+ newFileList.forEach(file -> newFileSet.add(file.getPath()));
+ if (!newFileSet.containsAll(oldFileSet)) {
+ throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+ + oldFileSet.removeAll(newFileSet));
+ }
+ Iterator<FileMetadata> iterator = newFileList.iterator();
+ while (iterator.hasNext()) {
+ FileMetadata file = iterator.next();
+ if (!oldFileSet.contains(file.getPath())) {
+ iterator.remove();
+ }
+ }
+ return newFileList;
+ }
+
+ /**
+ * Get partition metadata for a stream
+ * @param streamName name of the stream; should contain the information about the path of the
+ * root directory
+ * @param existingPartitionDescriptorMap map of the existing partition descriptor
+ * @return map of SSP metadata
+ */
+ public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
+ @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
+ LOG.info("Trying to obtain metadata for " + streamName);
+ LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+ : existingPartitionDescriptorMap));
+ Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
+ partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
+ List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
+ if (existingPartitionDescriptorMap != null) {
+ filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
+ }
+ List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
+ int partitionId = 0;
+ for (List<FileMetadata> fileGroup : groupedPartitions) {
+ Partition partition = new Partition(partitionId);
+ List<String> pathList = new ArrayList<>();
+ List<String> lengthList = new ArrayList<>();
+ fileGroup.forEach(fileMetadata -> {
+ pathList.add(fileMetadata.getPath());
+ lengthList.add(String.valueOf(fileMetadata.getLen()));
+ });
+ String oldestOffset = MultiFileHdfsReader.generateOffset(0, "0");
+ String newestOffset = MultiFileHdfsReader.generateOffset(lengthList.size() - 1, String.valueOf(lengthList.get(lengthList.size() - 1)));
+ SystemStreamPartitionMetadata metadata =
+ new SystemStreamPartitionMetadata(oldestOffset, newestOffset, null);
+ partitionMetadataMap.put(partition, metadata);
+ partitionDescriptorMap.get(streamName).put(partition, pathList);
+ partitionId++;
+ }
+ LOG.info("Obtained metadata map as: " + partitionMetadataMap);
+ LOG.info("Computed partition description as: " + partitionDescriptorMap);
+ return partitionMetadataMap;
+ }
+
+ /**
+ * Get partition descriptors for a stream
+ * @param streamName name of the stream; should contain the information about the path of the
+ * root directory
+ * @return map of the partition descriptor
+ */
+ public Map<Partition, List<String>> getPartitionDescriptor(String streamName) {
+ return partitionDescriptorMap.get(streamName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
new file mode 100644
index 0000000..5fec4bf
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.util.List;
+
+
+/**
+ * An adapter between directory partitioner and the actual file systems or
+ * file system like systems.
+ */
+public interface FileSystemAdapter {
+
+ /**
+ * Return the list of all files given the stream name
+ * @param streamName name of the stream
+ * @return list of <code>FileMetadata</code> for all files associated to the given stream
+ */
+ public List<FileMetadata> getAllFiles(String streamName);
+
+ public class FileMetadata {
+ private String path;
+ private long length;
+
+ public FileMetadata(String path, long length) {
+ this.path = path;
+ this.length = length;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getLen() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[path = %s, length = %s]", path, length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
new file mode 100644
index 0000000..bb7b3fa
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.partitioner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HdfsFileSystemAdapter implements FileSystemAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsFileSystemAdapter.class);
+
+ public List<FileMetadata> getAllFiles(String streamName) {
+ List<FileMetadata> ret = new ArrayList<>();
+ try {
+ Path streamPath = new Path(streamName);
+ FileSystem fileSystem = streamPath.getFileSystem(new Configuration());
+ FileStatus[] fileStatuses = fileSystem.listStatus(streamPath);
+ for (FileStatus fileStatus : fileStatuses) {
+ ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get the list of files for " + streamName, e);
+ throw new SamzaException(e);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
new file mode 100644
index 0000000..757b40b
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
@@ -0,0 +1,216 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.AvroFSInput;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of the HdfsReader that reads and processes avro format
+ * files.
+ */
+public class AvroFileHdfsReader implements SingleFileHdfsReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroFileHdfsReader.class);
+
+ private final SystemStreamPartition systemStreamPartition;
+ private DataFileReader<GenericRecord> fileReader;
+ private long curBlockStart;
+ private long curRecordOffset;
+
+ public AvroFileHdfsReader(SystemStreamPartition systemStreamPartition) {
+ this.systemStreamPartition = systemStreamPartition;
+ this.fileReader = null;
+ }
+
+ @Override
+ public void open(String pathStr, String singleFileOffset) {
+ LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset));
+ Path path = new Path(pathStr);
+ try {
+ AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path);
+ fileReader = new DataFileReader<>(input, new GenericDatumReader<>());
+ seek(singleFileOffset);
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public void seek(String singleFileOffset) {
+ try {
+ // See comments for AvroFileCheckpoint to understand the behavior below
+ AvroFileCheckpoint checkpoint = new AvroFileCheckpoint(singleFileOffset);
+ if (checkpoint.isStartingOffset()) {
+ // seek to the beginning of the first block
+ fileReader.sync(0);
+ curBlockStart = fileReader.previousSync();
+ curRecordOffset = 0;
+ return;
+ }
+ fileReader.seek(checkpoint.getBlockStart());
+ for (int i = 0; i < checkpoint.getRecordOffset(); i++) {
+ if (fileReader.hasNext()) {
+ fileReader.next();
+ }
+ }
+ curBlockStart = checkpoint.getBlockStart();
+ curRecordOffset = checkpoint.getRecordOffset();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public IncomingMessageEnvelope readNext() {
+ // get checkpoint for THIS record
+ String checkpoint = nextOffset();
+ GenericRecord record = fileReader.next();
+ if (fileReader.previousSync() != curBlockStart) {
+ curBlockStart = fileReader.previousSync();
+ curRecordOffset = 0;
+ } else {
+ curRecordOffset++;
+ }
+ // avro schema doesn't necessarily have key field
+ return new IncomingMessageEnvelope(systemStreamPartition, checkpoint, null, record);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return fileReader.hasNext();
+ }
+
+ @Override
+ public void close() {
+ LOG.info("About to close file reader for " + systemStreamPartition);
+ try {
+ fileReader.close();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ LOG.info("File reader closed for " + systemStreamPartition);
+ }
+
+ @Override
+ public String nextOffset() {
+ return AvroFileCheckpoint.generateCheckpointStr(curBlockStart, curRecordOffset);
+ }
+
+ public static int offsetComparator(String offset1, String offset2) {
+ AvroFileCheckpoint cp1 = new AvroFileCheckpoint(offset1);
+ AvroFileCheckpoint cp2 = new AvroFileCheckpoint(offset2);
+ return cp1.compareTo(cp2);
+ }
+
+ /**
+ * An avro file looks something like this:
+ *
+ * Byte offset: 0 103 271 391
+ * \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ * Avro file: \u2502 Header \u2502 Block 1 \u2502 Block 2 \u2502 Block 3 \u2502 ...
+ * \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ *
+ * Each block contains multiple records. The start of a block is defined as a valid
+ * synchronization point. A file reader can only seek to a synchronization point, i.e.
+ * the start of blocks. Thus, to precisely describe the location of a record, we need
+ * to use the pair (blockStart, recordOffset). Here "blockStart" means the start of the
+ * block and "recordOffset" means the index of the record within the block.
+ * Take the example above, and suppose block 1 has 4 records, we have record sequences as:
+ * (103, 0), (103, 1), (103, 2), (103, 3), (271, 0), ...
+ * where (271, 0) represents the first event in block 2
+ *
+ * With the CP_DELIM being '@', the actual checkpoint string would look like "103@1",
+ * "271@0" or "271", etc. For convenience, a checkpoint with only the blockStart but no
+ * recordOffset within the block simply means the first record in that block. Thus,
+ * "271@0" is equal to "271".
+ */
+ public static class AvroFileCheckpoint {
+ private static final String CP_DELIM = "@";
+ private long blockStart; // start position of the block
+ private long recordOffset; // record offset within the block
+ String checkpointStr;
+
+ public static String generateCheckpointStr(long blockStart, long recordOffset) {
+ return blockStart + CP_DELIM + recordOffset;
+ }
+
+ public AvroFileCheckpoint(String checkpointStr) {
+ String[] elements = checkpointStr.replaceAll("\\s", "").split(CP_DELIM);
+ if (elements.length > 2 || elements.length < 1) {
+ throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr);
+ }
+ try {
+ blockStart = Long.parseLong(elements[0]);
+ recordOffset = elements.length == 2 ? Long.parseLong(elements[1]) : 0;
+ } catch (NumberFormatException e) {
+ throw new SamzaException("Invalid checkpoint for AvroFileHdfsReader: " + checkpointStr, e);
+ }
+ this.checkpointStr = checkpointStr;
+ }
+
+ public AvroFileCheckpoint(long blockStart, long recordOffset) {
+ this.blockStart = blockStart;
+ this.recordOffset = recordOffset;
+ this.checkpointStr = generateCheckpointStr(blockStart, recordOffset);
+ }
+
+ public long getBlockStart() {
+ return blockStart;
+ }
+
+ public long getRecordOffset() {
+ return recordOffset;
+ }
+
+ public String getCheckpointStr() {
+ return checkpointStr;
+ }
+
+ public boolean isStartingOffset() {
+ return blockStart == 0;
+ }
+
+ public int compareTo(AvroFileCheckpoint other) {
+ if (this.blockStart < other.blockStart) {
+ return -1;
+ } else if (this.blockStart > other.blockStart) {
+ return 1;
+ } else return Long.compare(this.recordOffset, other.recordOffset);
+ }
+
+ @Override
+ public String toString() {
+ return getCheckpointStr();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
new file mode 100644
index 0000000..4efdfd7
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class HdfsReaderFactory {
+ public static SingleFileHdfsReader getHdfsReader(ReaderType readerType, SystemStreamPartition systemStreamPartition) {
+ switch (readerType) {
+ case AVRO: return new AvroFileHdfsReader(systemStreamPartition);
+ default:
+ throw new SamzaException("Unsupported reader type: " + readerType);
+ }
+ }
+
+ public static ReaderType getType(String readerTypeStr) {
+ try {
+ return ReaderType.valueOf(readerTypeStr.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new SamzaException("Invalid hdfs reader type string: " + readerTypeStr, e);
+ }
+ }
+
+ public static int offsetComparator(ReaderType readerType, String offset1, String offset2) {
+ switch (readerType) {
+ case AVRO: return AvroFileHdfsReader.offsetComparator(offset1, offset2);
+ default:
+ throw new SamzaException("Unsupported reader type: " + readerType);
+ }
+ }
+
+ /*
+ * Support AVRO only so far. Implement <code>SingleFileHdfsReader</code> to support a variety of
+ * file parsers. Can easily support "plain" text in the future (each line of the
+ * text representing a record for example)
+ */
+ public enum ReaderType {
+ AVRO
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
new file mode 100644
index 0000000..7870713
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A wrapper on top of {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader}
+ * to manage the situation of multiple files per partition.
+ *
+ * The offset for MultiFileHdfsReader, which is also the offset that gets
+ * committed in and used by Samza, consists of two parts: file index,
+ * actual offset within file. For example, 3:127
+ *
+ * Format of the offset within file is defined by the implementation of
+ * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
+ */
+public class MultiFileHdfsReader {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiFileHdfsReader.class);
+ private static final String DELIMITER = ":";
+
+ private final HdfsReaderFactory.ReaderType readerType;
+ private final SystemStreamPartition systemStreamPartition;
+ private List<String> filePaths;
+ private SingleFileHdfsReader curReader;
+ private int curFileIndex = 0;
+ private String curSingleFileOffset;
+ private int numRetries;
+ private int numMaxRetries;
+
+ /**
+ * Get the current file index from the offset string
+ * @param offset offset string that contains both file index and offset within file
+ * @return the file index part
+ */
+ public static int getCurFileIndex(String offset) {
+ String[] elements = offset.split(DELIMITER);
+ if (elements.length < 2) {
+ throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+ }
+ return Integer.parseInt(elements[0]);
+ }
+
+ /**
+ * Get the offset within file from the offset string
+ * @param offset offset string that contains both file index and offset within file
+ * @return the single file offset part
+ */
+ public static String getCurSingleFileOffset(String offset) {
+ String[] elements = offset.split(DELIMITER);
+ if (elements.length < 2) {
+ throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
+ }
+ // Getting the remaining of the offset string in case the single file
+ // offset uses the same delimiter.
+ return offset.substring(elements[0].length() + 1);
+ }
+
+ /**
+ * Generate the offset based on file index and offset within single file
+ * @param fileIndex index of the file
+ * @param singleFileOffset offset within single file
+ * @return the complete offset
+ */
+ public static String generateOffset(int fileIndex, String singleFileOffset) {
+ return fileIndex + DELIMITER + singleFileOffset;
+ }
+
+ /*
+ * Get current offset: offset of the LAST message being successfully read. If no messages have
+ * ever been read, return the offset of first event.
+ */
+ private String getCurOffset() {
+ return generateOffset(curFileIndex, curSingleFileOffset);
+ }
+
+ public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+ List<String> partitionDescriptors, String offset) {
+ this(readerType, systemStreamPartition, partitionDescriptors, offset,
+ Integer.parseInt(HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT()));
+ }
+
+ private void init(String offset) {
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ }
+ curFileIndex = getCurFileIndex(offset);
+ if (curFileIndex >= filePaths.size()) {
+ throw new SamzaException(
+ String.format("Invalid file index %d. Number of files is %d", curFileIndex, filePaths.size()));
+ }
+ curSingleFileOffset = getCurSingleFileOffset(offset);
+ curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+ curReader.open(filePaths.get(curFileIndex), curSingleFileOffset);
+ }
+
+ public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
+ List<String> partitionDescriptors, String offset, int numMaxRetries) {
+ this.readerType = readerType;
+ this.systemStreamPartition = systemStreamPartition;
+ this.filePaths = partitionDescriptors;
+ this.numMaxRetries = numMaxRetries;
+ this.numRetries = 0;
+ if (partitionDescriptors.size() <= 0) {
+ throw new SamzaException(
+ "Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
+ }
+ init(offset);
+ }
+
+ public boolean hasNext() {
+ while (curFileIndex < filePaths.size()) {
+ if (curReader.hasNext()) {
+ return true;
+ }
+ curReader.close();
+ curFileIndex++;
+ if (curFileIndex < filePaths.size()) {
+ curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
+ curReader.open(filePaths.get(curFileIndex), "0");
+ }
+ }
+ return false;
+ }
+
+ public IncomingMessageEnvelope readNext() {
+ if (!hasNext()) {
+ LOG.warn("Attempting to read more data when there aren't any. ssp=" + systemStreamPartition);
+ return null;
+ }
+ // record the next offset before we read, so when the read fails and we reconnect,
+ // we seek to the same offset that we try below
+ curSingleFileOffset = curReader.nextOffset();
+ IncomingMessageEnvelope messageEnvelope = curReader.readNext();
+ // Copy everything except for the offset. Turn the single-file style offset into a multi-file one
+ return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
+ messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+ }
+
+ /**
+ * Reconnect to the file systems in case of failure.
+ * Reset offset to the last checkpoint (last successfully read message).
+ * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+ * retries.
+ */
+ public void reconnect() {
+ reconnect(getCurOffset());
+ }
+
+ /**
+ * Reconnect to the file systems in case of failures.
+ * @param offset reset offset to the specified offset
+ * Throw {@link org.apache.samza.SamzaException} if reaches max number of
+ * retries.
+ */
+ public void reconnect(String offset) {
+ if (numRetries >= numMaxRetries) {
+ throw new SamzaException(
+ String.format("Give up reconnecting. numRetries: %d; numMaxRetries: %d", numRetries, numMaxRetries));
+ }
+ LOG.info(String
+ .format("Reconnecting with offset: %s numRetries: %d numMaxRetries: %d", offset, numRetries, numMaxRetries));
+ numRetries++;
+ init(offset);
+ }
+
+ public void close() {
+ LOG.info(String.format("MiltiFileHdfsReader shutdown requested for %s. Current offset = %s", systemStreamPartition,
+ getCurOffset()));
+ if (curReader != null) {
+ curReader.close();
+ }
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return systemStreamPartition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
new file mode 100644
index 0000000..eb8a70d
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.hdfs.reader;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+public interface SingleFileHdfsReader {
+ /**
+ * Open the file and seek to specific offset for reading.
+ * @param path path of the file to be read
+ * @param offset offset the reader should start from
+ */
+ public void open(String path, String offset);
+
+ /**
+ * Seek to a specific offset
+ * @param offset offset the reader should seek to
+ */
+ public void seek(String offset);
+
+ /**
+ * Construct and return the next message envelope
+ * @return constructed IncomeMessageEnvelope
+ */
+ public IncomingMessageEnvelope readNext();
+
+ /**
+ * Get the next offset, which is the offset for the next message
+ * that will be returned by readNext
+ * @return next offset
+ */
+ public String nextOffset();
+
+ /**
+ * Whether there are still records to be read
+ * @return true of false based on whether the reader has hit end of file
+ */
+ public boolean hasNext();
+
+ /**
+ * Close the reader.
+ */
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 61b7570..53ff372 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -25,7 +25,7 @@ import java.util.UUID
import org.apache.samza.SamzaException
import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.{Config, ScalaMapConfig}
+import org.apache.samza.config.{YarnConfig, Config, ScalaMapConfig}
import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConversions._
@@ -62,6 +62,34 @@ object HdfsConfig {
val BUCKETER_CLASS = "systems.%s.producer.hdfs.bucketer.class"
val BUCKETER_CLASS_DEFAULT = "org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer"
+ // capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+ val CONSUMER_BUFFER_CAPACITY = "systems.%s.consumer.bufferCapacity"
+ val CONSUMER_BUFFER_CAPACITY_DEFAULT = 10.toString
+
+ // number of max retries for the hdfs consumer readers per partition
+ val CONSUMER_NUM_MAX_RETRIES = "system.%s.consumer.numMaxRetries"
+ val CONSUMER_NUM_MAX_RETRIES_DEFAULT = 10.toString
+
+ // white list used by directory partitioner to filter out unwanted files in a hdfs directory
+ val CONSUMER_PARTITIONER_WHITELIST = "systems.%s.partitioner.defaultPartitioner.whitelist"
+ val CONSUMER_PARTITIONER_WHITELIST_DEFAULT = ".*"
+
+ // black list used by directory partitioner to filter out unwanted files in a hdfs directory
+ val CONSUMER_PARTITIONER_BLACKLIST = "systems.%s.partitioner.defaultPartitioner.blacklist"
+ val CONSUMER_PARTITIONER_BLACKLIST_DEFAULT = ""
+
+ // group pattern used by directory partitioner for advanced partitioning
+ val CONSUMER_PARTITIONER_GROUP_PATTERN = "systems.%s.partitioner.defaultPartitioner.groupPattern"
+ val CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT = ""
+
+ // type of the file reader (avro, plain, etc.)
+ val FILE_READER_TYPE = "systems.%s.consumer.reader"
+ val FILE_READER_TYPE_DEFAULT = "avro"
+
+ // staging directory for storing partition description
+ val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
+ val STAGING_DIRECTORY_DEFAULT = ""
+
implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
}
@@ -130,4 +158,53 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
getOrElse(HdfsConfig.COMPRESSION_TYPE format systemName, HdfsConfig.COMPRESSION_TYPE_DEFAULT)
}
+ /**
+ * Get the capacity of the hdfs consumer buffer - the blocking queue used for storing messages
+ */
+ def getConsumerBufferCapacity(systemName: String): Int = {
+ getOrElse(HdfsConfig.CONSUMER_BUFFER_CAPACITY format systemName, HdfsConfig.CONSUMER_BUFFER_CAPACITY_DEFAULT).toInt
+ }
+
+ /**
+ * Get number of max retries for the hdfs consumer readers per partition
+ */
+ def getConsumerNumMaxRetries(systemName: String): Int = {
+ getOrElse(HdfsConfig.CONSUMER_NUM_MAX_RETRIES format systemName, HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT).toInt
+ }
+
+ /**
+ * White list used by directory partitioner to filter out unwanted files in a hdfs directory
+ */
+ def getPartitionerWhiteList(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_WHITELIST_DEFAULT)
+ }
+
+ /**
+ * Black list used by directory partitioner to filter out unwanted files in a hdfs directory
+ */
+ def getPartitionerBlackList(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST format systemName, HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST_DEFAULT)
+ }
+
+ /**
+ * Group pattern used by directory partitioner for advanced partitioning
+ */
+ def getPartitionerGroupPattern(systemName: String): String = {
+ getOrElse(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN format systemName, HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN_DEFAULT)
+ }
+
+ /**
+ * Get the type of the file reader (avro, plain, etc.)
+ */
+ def getFileReaderType(systemName: String): String = {
+ getOrElse(HdfsConfig.FILE_READER_TYPE format systemName, HdfsConfig.FILE_READER_TYPE_DEFAULT)
+ }
+
+ /**
+ * Staging directory for storing partition description. If not set, will use the staging directory set
+ * by yarn job.
+ */
+ def getStagingDirectory(): String = {
+ getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
deleted file mode 100644
index 92eb447..0000000
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.hdfs
-
-
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{SystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-
-
-class HdfsSystemAdmin extends SystemAdmin with Logging {
-
- def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
- new java.util.HashMap[SystemStreamPartition, String]()
- }
-
- def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
- new java.util.HashMap[String, SystemStreamMetadata]()
- }
-
- def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def createCoordinatorStream(streamName: String) {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-
- def offsetComparator(offset1: String, offset2: String) = {
- throw new UnsupportedOperationException("Method not implemented.")
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2216fe0b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
index ef3c20a..3673431 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
@@ -1,45 +1,45 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
package org.apache.samza.system.hdfs
-import org.apache.samza.SamzaException
-
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.{KafkaUtil,Logging}
+import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
+import org.apache.samza.util.{KafkaUtil, Logging}
class HdfsSystemFactory extends SystemFactory with Logging {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
- throw new SamzaException("HdfsSystemFactory does not implement a consumer")
+ new HdfsSystemConsumer(systemName, config, new HdfsSystemConsumerMetrics(registry))
}
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ // TODO: SAMZA-1026: should remove Kafka dependency below
val clientId = KafkaUtil.getClientId("samza-producer", config)
val metrics = new HdfsSystemProducerMetrics(systemName, registry)
new HdfsSystemProducer(systemName, clientId, config, metrics)
}
def getAdmin(systemName: String, config: Config) = {
- new HdfsSystemAdmin
+ new HdfsSystemAdmin(systemName, config)
}
}