You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2018/12/14 22:36:05 UTC
samza git commit: SAMZA-2041: add hdfs and kinesis descriptor
Repository: samza
Updated Branches:
refs/heads/master 85830be9c -> c5348bf6b
SAMZA-2041: add hdfs and kinesis descriptor
Author: Hai Lu <ha...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #857 from lhaiesp/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5348bf6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5348bf6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5348bf6
Branch: refs/heads/master
Commit: c5348bf6b1fcaf7f9db841b815b6b5eeb9937395
Parents: 85830be
Author: Hai Lu <ha...@linkedin.com>
Authored: Fri Dec 14 14:35:45 2018 -0800
Committer: Hai Lu <ha...@linkedin.com>
Committed: Fri Dec 14 14:35:45 2018 -0800
----------------------------------------------------------------------
.../documentation/versioned/connectors/hdfs.md | 48 ++--
.../versioned/connectors/kinesis.md | 102 +++++---
.../samza/system/kinesis/KinesisConfig.java | 22 +-
.../descriptors/KinesisInputDescriptor.java | 123 +++++++++
.../descriptors/KinesisSystemDescriptor.java | 139 ++++++++++
.../descriptors/TestKinesisInputDescriptor.java | 58 +++++
.../TestKinesisSystemDescriptor.java | 57 +++++
.../hdfs/descriptors/HdfsInputDescriptor.java | 48 ++++
.../hdfs/descriptors/HdfsOutputDescriptor.java | 46 ++++
.../hdfs/descriptors/HdfsSystemDescriptor.java | 255 +++++++++++++++++++
.../descriptors/TestHdfsSystemDescriptor.java | 55 ++++
11 files changed, 883 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/docs/learn/documentation/versioned/connectors/hdfs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md b/docs/learn/documentation/versioned/connectors/hdfs.md
index ece7bbf..822be7e 100644
--- a/docs/learn/documentation/versioned/connectors/hdfs.md
+++ b/docs/learn/documentation/versioned/connectors/hdfs.md
@@ -47,14 +47,11 @@ While streaming sources like Kafka are unbounded, files on HDFS have finite data
#### Defining streams
-Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS.
-
-{% highlight jproperties %}
-systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
-
-streams.hdfs-clickstream.samza.system=hdfs
-streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
+In Samza high level API, you can use `HdfsSystemDescriptor` to create a HDFS system. The stream name should be set to the name of the directory on HDFS.
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream");
+HdfsInputDescriptor hid = hsd.getInputDescriptor("/data/clickstream/2016/09/11");
{% endhighlight %}
The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory.
@@ -62,9 +59,10 @@ The above example defines a stream called `hdfs-clickstream` that reads data fro
#### Whitelists & Blacklists
If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results.
-{% highlight jproperties %}
-systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
-systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+ .withConsumerWhiteList(".*avro")
+ .withConsumerBlackList("somefile.avro");
{% endhighlight %}
@@ -74,34 +72,34 @@ systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection.
-{% highlight jproperties %}
-# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'
-systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
-systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+ .withWriterClassName(AvroDataFileHdfsWriter.class.getName());
{% endhighlight %}
-If your output is non-avro, you can describe its format by implementing your own serializer.
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
-serializers.registry.my-serde-name.class=MySerdeFactory
-systems.hdfs.samza.msg.serde=my-serde-name
+If your output is non-avro, use `TextSequenceFileHdfsWriter`.
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+ .withWriterClassName(TextSequenceFileHdfsWriter.class.getName());
{% endhighlight %}
#### Output directory structure
Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter.
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
-systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+ .withOutputBaseDir("/user/me/analytics/clickstream_data")
+ .withDatePathFormat("yyyy_MM_dd");
{% endhighlight %}
You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
-systems.hdfs.producer.hdfs.write.batch.size.records=10000
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+ .withWriteBatchSizeBytes(134217728)
+ .withWriteBatchSizeRecords(10000);
{% endhighlight %}
### Security
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/docs/learn/documentation/versioned/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kinesis.md b/docs/learn/documentation/versioned/connectors/kinesis.md
index e319e92..57dae9c 100644
--- a/docs/learn/documentation/versioned/connectors/kinesis.md
+++ b/docs/learn/documentation/versioned/connectors/kinesis.md
@@ -36,22 +36,16 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac
#### Basic Configuration
-Here is the required configuration for consuming messages from Kinesis.
-
-{% highlight jproperties %}
-// Define a Kinesis system factory with your identifier. eg: kinesis-system
-systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
-
-// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory
-job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
-
-// Define your streams
-task.inputs=kinesis-system.input0
-
-// Define required properties for your streams
-systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
-systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
-sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
+Here is the required configuration for consuming messages from Kinesis, through `KinesisSystemDescriptor` and `KinesisInputDescriptor`.
+
+{% highlight java %}
+KinesisSystemDescriptor ksd = new KinesisSystemDescriptor("kinesis");
+
+KinesisInputDescriptor<KV<String, byte[]>> kid =
+ ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
+ .withRegion("STREAM-REGION")
+ .withAccessKey("YOUR-ACCESS_KEY")
+ .withSecretKey("YOUR-SECRET-KEY");
{% endhighlight %}
####Coordination
@@ -66,10 +60,12 @@ job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.str
Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS.
-{% highlight jproperties %}
-systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
-systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
-sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid =
+ ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
+ .withRegion("STREAM-REGION")
+ .withAccessKey("YOUR-ACCESS_KEY")
+ .withSecretKey("YOUR-SECRET-KEY");
{% endhighlight %}
### Advanced Configuration
@@ -77,29 +73,44 @@ sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
#### Kinesis Client Library Configs
Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
(KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix.
+for a stream by configuring it through `KinesisInputDescriptor`.
-{% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
+
+kid.withKCLConfig(kclConfig);
{% endhighlight %}
As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
-{% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.TableName=myTable
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("TableName", "myTable");
+
+kid.withKCLConfig(kclConfig);
{% endhighlight %}
#### AWS Client configs
Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance.
-You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix.
+You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) through `KinesisSystemDescriptor`.
-{% highlight jproperties %}
-systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
+{% highlight java %}
+Map<String, String> awsConfig = new HashMap<>;
+awsConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
+
+KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
+ .withAWSConfig(awsConfig);
{% endhighlight %}
-As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client:
-{% highlight jproperties %}
-systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
-systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+Through `KinesisSystemDescriptor` you can also set the *proxy host* and *proxy port* to be used by the Kinesis Client:
+{% highlight java %}
+KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
+ .withProxyHost("YOUR-PROXY-HOST")
+ .withProxyPort(YOUR-PROXY-PORT);
{% endhighlight %}
### Resetting Offsets
@@ -109,14 +120,37 @@ These checkpoints are stored and managed by the KCL library internally. You can
{% highlight jproperties %}
// change the TableName to a unique name to reset checkpoints.
-systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
+systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName=my-app-table-name
+{% endhighlight %}
+
+Or through `KinesisInputDescriptor`
+
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("TableName", "my-new-app-table-name");
+
+kid.withKCLConfig(kclConfig);
{% endhighlight %}
+
When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.
{% highlight jproperties %}
// set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
-systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST
+systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream=LATEST
+{% endhighlight %}
+
+Or through `KinesisInputDescriptor`
+
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("InitialPositionInStream", "LATEST");
+
+kid.withKCLConfig(kclConfig);
{% endhighlight %}
Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table.
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
index d11096f..e0c9099 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -54,20 +54,20 @@ import com.amazonaws.ClientConfiguration;
public class KinesisConfig extends MapConfig {
private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
- private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
- private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+ public static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+ public static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
- private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
- private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+ public static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+ public static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
- private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
- private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
- private static final String DEFAULT_CONFIG_PROXY_HOST = "";
- private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
- private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+ public static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+ public static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+ public static final String DEFAULT_CONFIG_PROXY_HOST = "";
+ public static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+ public static final int DEFAULT_CONFIG_PROXY_PORT = 0;
- private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
- private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+ public static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+ public static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
public KinesisConfig(Config config) {
super(config);
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
new file mode 100644
index 0000000..1c2e0a2
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.kinesis.KinesisConfig;
+
+
+/**
+ * A {@link KinesisInputDescriptor} can be used for specifying Samza and Kinesis specific properties of Kinesis
+ * input streams.
+ * <p>
+ * Use {@link KinesisSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class KinesisInputDescriptor<StreamMessageType>
+ extends InputDescriptor<StreamMessageType, KinesisInputDescriptor<StreamMessageType>> {
+ private Optional<String> accessKey = Optional.empty();
+ private Optional<String> secretKey = Optional.empty();
+ private Optional<String> region = Optional.empty();
+ private Map<String, String> kclConfig = Collections.emptyMap();
+
+
+ /**
+ * Constructs an {@link InputDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param valueSerde serde the values in the messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ <T> KinesisInputDescriptor(String streamId, Serde<T> valueSerde, SystemDescriptor systemDescriptor) {
+ super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
+ }
+
+ /**
+ * Kinesis region for the system stream.
+ * @param region Kinesis region
+ * @return this input descriptor
+ */
+ public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
+ this.region = Optional.of(StringUtils.stripToNull(region));
+ return this;
+ }
+
+ /**
+ * Kinesis access key name for the system stream.
+ * @param accessKey Kinesis access key name
+ * @return this input descriptor
+ */
+ public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
+ this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
+ return this;
+ }
+
+ /**
+ * Kinesis secret key name for the system stream.
+ * @param secretKey Kinesis secret key
+ * @return this input descriptor
+ */
+ public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
+ this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
+ return this;
+ }
+
+ /**
+ * KCL (Kinesis Client Library) config for the system stream. This is not required by default.
+ * @param kclConfig A map of specified KCL configs
+ * @return this input descriptor
+ */
+ public KinesisInputDescriptor<StreamMessageType> withKCLConfig(Map<String, String> kclConfig) {
+ this.kclConfig = kclConfig;
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ Map<String, String> config = new HashMap<>(super.toConfig());
+
+ String systemName = getSystemName();
+ String streamId = getStreamId();
+ String clientConfigPrefix =
+ String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
+
+ this.region.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
+ this.accessKey.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
+ this.secretKey.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
+ this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
new file mode 100644
index 0000000..ffeb667
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.KinesisSystemFactory;
+
+
+/**
+ * A {@link KinesisSystemDescriptor} can be used for specifying Samza and Kinesis-specific properties of a Kinesis
+ * input system. It can also be used for obtaining {@link KinesisInputDescriptor}s,
+ * which can be used for specifying Samza and system-specific properties of Kinesis input streams.
+ * <p>
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescriptor> {
+ private static final String FACTORY_CLASS_NAME = KinesisSystemFactory.class.getName();
+
+ private Optional<String> region = Optional.empty();
+ private Optional<String> proxyHost = Optional.empty();
+ private Optional<Integer> proxyPort = Optional.empty();
+ private Map<String, String> awsConfig = Collections.emptyMap();
+ private Map<String, String> kclConfig = Collections.emptyMap();
+
+ public KinesisSystemDescriptor(String systemName) {
+ super(systemName, FACTORY_CLASS_NAME, null, null);
+ }
+
+ /**
+ * Gets an {@link KinesisInputDescriptor} for the input stream of this system.
+ * <p>
+ * The message in the stream will have {@link String} keys and {@code ValueType} values.
+ *
+ * @param streamId id of the input stream
+ * @param valueSerde stream level serde for the values in the messages in the input stream
+ * @param <ValueType> type of the value in the messages in this stream
+ * @return an {@link KinesisInputDescriptor} for the Kinesis input stream
+ */
+ public <ValueType> KinesisInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId,
+ Serde<ValueType> valueSerde) {
+ return new KinesisInputDescriptor<>(streamId, valueSerde, this);
+ }
+
+ /**
+ * Kinesis region for this system.
+ * @param region Kinesis region
+ * @return this system descriptor
+ */
+ public KinesisSystemDescriptor withRegion(String region) {
+ this.region = Optional.of(StringUtils.stripToNull(region));
+ return this;
+ }
+
+ /**
+ * AWS config for this system. This is not required by default.
+ * @param awsConfig A map of specified AWS configs
+ * @return this system descriptor
+ */
+ public KinesisSystemDescriptor withAWSConfig(Map<String, String> awsConfig) {
+ this.awsConfig = awsConfig;
+ return this;
+ }
+
+ /**
+ * KCL (Kinesis Client Library) config for this system. This is not required by default.
+ * @param kclConfig A map of specified KCL configs
+ * @return this system descriptor
+ */
+ public KinesisSystemDescriptor withKCLConfig(Map<String, String> kclConfig) {
+ this.kclConfig = kclConfig;
+ return this;
+ }
+
+ /**
+ * Proxy host to be used for this system.
+ * @param proxyHost Proxy host
+ * @return this system descriptor
+ */
+ public KinesisSystemDescriptor withProxyHost(String proxyHost) {
+ this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost));
+ return this;
+ }
+
+ /**
+ * Proxy port to be used for this system.
+ * @param proxyPort Proxy port
+ * @return this system descriptor
+ */
+ public KinesisSystemDescriptor withProxyPort(int proxyPort) {
+ this.proxyPort = Optional.of(proxyPort);
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ Map<String, String> config = new HashMap<>(super.toConfig());
+ String systemName = getSystemName();
+
+ this.region.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
+ this.proxyHost.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
+ this.proxyPort.ifPresent(
+ val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
+
+ final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
+ this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
+
+ final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
+ this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java
new file mode 100644
index 0000000..f0a8a3b
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisInputDescriptor {
+ @Test
+ public void testConfigGeneration() {
+ String systemName = "kinesis";
+ String streamName = "Seine";
+ KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName);
+ Map<String, String> cliConfig = new HashMap<>();
+ cliConfig.put("key1", "value1");
+ KinesisInputDescriptor<KV<String, byte[]>> id = sd.getInputDescriptor(streamName, new NoOpSerde<byte[]>())
+ .withRegion("Paris")
+ .withAccessKey("accessKey")
+ .withSecretKey("secretKey")
+ .withKCLConfig(cliConfig);
+
+ Map<String, String> generatedConfig = id.toConfig();
+ Assert.assertEquals(5, generatedConfig.size());
+
+ Assert.assertEquals(systemName, generatedConfig.get("streams.Seine.samza.system"));
+ Assert.assertEquals("Paris",
+ generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamName)));
+ Assert.assertEquals("accessKey",
+ generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamName)));
+ Assert.assertEquals("secretKey",
+ generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamName)));
+ Assert.assertEquals("value1", generatedConfig.get(
+ String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamName) + "key1"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java
new file mode 100644
index 0000000..f12dad1
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.KinesisSystemFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemDescriptor {
+ @Test
+ public void testConfigGeneration() {
+ String systemName = "kinesis";
+ Map<String, String> kclConfig = new HashMap<>();
+ kclConfig.put("key1", "value1");
+ Map<String, String> awsConfig = new HashMap<>();
+ awsConfig.put("key2", "value2");
+
+ KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName).withRegion("London")
+ .withProxyHost("US")
+ .withProxyPort(1776)
+ .withAWSConfig(awsConfig)
+ .withKCLConfig(kclConfig);
+
+ Map<String, String> generatedConfig = sd.toConfig();
+ Assert.assertEquals(6, generatedConfig.size());
+
+ Assert.assertEquals(KinesisSystemFactory.class.getName(), generatedConfig.get("systems.kinesis.samza.factory"));
+ Assert.assertEquals("London", generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName)));
+ Assert.assertEquals("US", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName)));
+ Assert.assertEquals("1776", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName)));
+ Assert.assertEquals("value1",
+ generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName) + "key1"));
+ Assert.assertEquals("value2",
+ generatedConfig.get(String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName) + "key2"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java
new file mode 100644
index 0000000..e3e3fa4
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+
+/**
+ * A {@link HdfsInputDescriptor} can be used for specifying Samza and HDFS specific properties of HDFS
+ * input streams.
+ * <p>
+ * Use {@link HdfsSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ *
+ */
+public class HdfsInputDescriptor
+ extends InputDescriptor<Object, HdfsInputDescriptor> {
+
+ /**
+ * Constructs an {@link InputDescriptor} instance. Hdfs input has no key. Value type is determined by
+ * reader type (see {@link HdfsSystemDescriptor#withReaderType}).
+ *
+ * @param streamId id of the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ HdfsInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde(), systemDescriptor, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java
new file mode 100644
index 0000000..7b7e118
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+/**
+ * A {@link HdfsOutputDescriptor} can be used for specifying Samza and HDFS-specific properties of HDFS
+ * output streams.
+ * <p>
+ * Use {@link HdfsSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class HdfsOutputDescriptor
+ extends OutputDescriptor<Object, HdfsOutputDescriptor> {
+
+ /**
+ * Constructs an {@link OutputDescriptor} instance. Hdfs output has no key. Value type is determined by
+ * writer class (see {@link HdfsSystemDescriptor#withWriterClassName}).
+ *
+ * @param streamId id of the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ HdfsOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde(), systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
new file mode 100644
index 0000000..f4d8566
--- /dev/null
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.apache.samza.system.hdfs.HdfsSystemFactory;
+
+
+/**
+ * A {@link HdfsSystemDescriptor} can be used for specifying Samza and HDFS-specific properties of a HDFS
+ * input/output system. It can also be used for obtaining {@link HdfsInputDescriptor}s and
+ * {@link HdfsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * HDFS input/output streams.
+ * <p>
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> {
+ private static final String FACTORY_CLASS_NAME = HdfsSystemFactory.class.getName();
+
+ private Optional<String> datePathFormat = Optional.empty();
+ private Optional<String> outputBaseDir = Optional.empty();
+ private Optional<Long> writeBatchSizeBytes = Optional.empty();
+ private Optional<Long> writeBatchSizeRecords = Optional.empty();
+ private Optional<String> writeCompressionType = Optional.empty();
+ private Optional<String> writerClass = Optional.empty();
+
+ private Optional<Long> consumerBufferCapacity = Optional.empty();
+ private Optional<Long> consumerMaxRetries = Optional.empty();
+ private Optional<String> consumerWhiteList = Optional.empty();
+ private Optional<String> consumerBlackList = Optional.empty();
+ private Optional<String> consumerGroupPattern = Optional.empty();
+ private Optional<String> consumerReader = Optional.empty();
+ private Optional<String> consumerStagingDirectory = Optional.empty();
+
+ public HdfsSystemDescriptor(String systemName) {
+ super(systemName, FACTORY_CLASS_NAME, null, null);
+ }
+
+ /**
+ * Gets an {@link HdfsInputDescriptor} for the input stream of this system.
+ * <p>
+ * The message in the stream has no key and the value type is determined by reader type.
+ *
+ * @param streamId id of the input stream
+ * @return an {@link HdfsInputDescriptor} for the hdfs input stream
+ */
+ public HdfsInputDescriptor getInputDescriptor(String streamId) {
+ return new HdfsInputDescriptor(streamId, this);
+ }
+
+ /**
+ * Gets an {@link HdfsOutputDescriptor} for the output stream of this system.
+ * <p>
+ * The message in the stream has no key and the value type is determined by writer class.
+ *
+ * @param streamId id of the output stream
+ * @return an {@link HdfsOutputDescriptor} for the hdfs output stream
+ */
+ public HdfsOutputDescriptor getOutputDescriptor(String streamId) {
+ return new HdfsOutputDescriptor(streamId, this);
+ }
+
+ /**
+ * In an HdfsWriter implementation that performs time-based output bucketing,
+ * the user may configure a date format (suitable for inclusion in a file path)
+ * using <code>SimpleDateFormat</code> formatting that the Bucketer implementation will
+ * use to generate HDFS paths and filenames. The more granular this date format, the more
+ * often a bucketing HdfsWriter will begin a new date-path bucket when creating the next output file.
+ * @param datePathFormat date path format
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) {
+ this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat));
+ return this;
+ }
+
+ /**
+ * The base output directory into which all HDFS output for this job will be written.
+ * @param outputBaseDir output base directory
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) {
+ this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir));
+ return this;
+ }
+
+ /**
+ * Split output files from all writer tasks based on # of bytes written to optimize
+ * MapReduce utilization for Hadoop jobs that will process the data later.
+ * @param writeBatchSizeBytes write batch size in bytes.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withWriteBatchSizeBytes(long writeBatchSizeBytes) {
+ this.writeBatchSizeBytes = Optional.of(writeBatchSizeBytes);
+ return this;
+ }
+
+ /**
+ * Split output files from all writer tasks based on # of bytes written to optimize
+ * MapReduce utilization for Hadoop jobs that will process the data later.
+ * @param writeBatchSizeRecords write batch size in records.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withWriteBatchSizeRecords(long writeBatchSizeRecords) {
+ this.writeBatchSizeRecords = Optional.of(writeBatchSizeRecords);
+ return this;
+ }
+
+ /**
+ * Simple, human-readable label for various compression options. HdfsWriter implementations
+ * can choose how to handle these individually, or throw an exception. Example: "none", "gzip", ...
+ * @param writeCompressionType compression type for writer.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) {
+ this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType));
+ return this;
+ }
+
+ /**
+ * The fully-qualified class name of the HdfsWriter subclass that will write for this system.
+ * @param writerClassName writer class name.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withWriterClassName(String writerClassName) {
+ this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName));
+ return this;
+ }
+
+ /**
+ * The capacity of the hdfs consumer buffer - the blocking queue used for storing messages.
+ * @param bufferCapacity the buffer capacity for HDFS consumer.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withConsumerBufferCapacity(long bufferCapacity) {
+ this.consumerBufferCapacity = Optional.of(bufferCapacity);
+ return this;
+ }
+
+ /**
+ * Number of max retries for the hdfs consumer readers per partition.
+ * @param maxRetries number of max retires for HDFS consumer.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withConsumerNumMaxRetries(long maxRetries) {
+ this.consumerMaxRetries = Optional.of(maxRetries);
+ return this;
+ }
+
+ /**
+ * White list used by directory partitioner to filter out unwanted files in a hdfs directory.
+ * @param whiteList white list for HDFS consumer inputs.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) {
+ this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList));
+ return this;
+ }
+
+ /**
+ * Black list used by directory partitioner to filter out unwanted files in a hdfs directory.
+ * @param blackList black list for HDFS consumer inputs.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withConsumerBlackList(String blackList) {
+ this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList));
+ return this;
+ }
+
+ /**
+ * Group pattern used by directory partitioner for advanced partitioning.
+ * @param groupPattern group parttern for HDFS consumer inputs.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) {
+ this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern));
+ return this;
+ }
+
+ /**
+ * The type of the file reader for consumer (avro, plain, etc.)
+ * @param readerType reader type for HDFS consumer inputs.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withReaderType(String readerType) {
+ this.consumerReader = Optional.of(StringUtils.stripToNull(readerType));
+ return this;
+ }
+
+ /**
+ * Staging directory for storing partition description. If not set, will use the staging directory set
+ * by yarn job.
+ * @param stagingDirectory staging directory for HDFS consumer inputs.
+ * @return this system descriptor
+ */
+ public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) {
+ this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory));
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ Map<String, String> config = new HashMap<>(super.toConfig());
+ String systemName = getSystemName();
+
+ this.datePathFormat.ifPresent(
+ val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val));
+ this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
+ this.writeBatchSizeBytes.ifPresent(
+ val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val)));
+ this.writeBatchSizeRecords.ifPresent(
+ val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val)));
+ this.writeCompressionType.ifPresent(
+ val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val));
+ this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
+
+ this.consumerBufferCapacity.ifPresent(
+ val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val)));
+ this.consumerMaxRetries.ifPresent(
+ val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val)));
+ this.consumerWhiteList.ifPresent(
+ val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val));
+ this.consumerBlackList.ifPresent(
+ val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val));
+ this.consumerGroupPattern.ifPresent(
+ val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val));
+ this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
+ this.consumerStagingDirectory.ifPresent(
+ val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val));
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java
new file mode 100644
index 0000000..78d85e9
--- /dev/null
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import java.util.Map;
+
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.apache.samza.system.hdfs.HdfsSystemFactory;
+import org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestHdfsSystemDescriptor {
+ @Test
+ public void testMajorConfigGeneration() {
+ String systemName = "hdfs";
+
+ HdfsSystemDescriptor sd = new HdfsSystemDescriptor(systemName).withConsumerBufferCapacity(950)
+ .withConsumerWhiteList(".*")
+ .withReaderType("avro")
+ .withOutputBaseDir("/home/output")
+ .withWriterClassName(AvroDataFileHdfsWriter.class.getName());
+ sd.getInputDescriptor("input");
+
+ Map<String, String> generatedConfig = sd.toConfig();
+ Assert.assertEquals(6, generatedConfig.size());
+ System.out.println(generatedConfig);
+
+ Assert.assertEquals(HdfsSystemFactory.class.getName(), generatedConfig.get("systems.hdfs.samza.factory"));
+ Assert.assertEquals("950", generatedConfig.get(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName)));
+ Assert.assertEquals(".*",
+ generatedConfig.get(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName)));
+ Assert.assertEquals("avro", generatedConfig.get(String.format(HdfsConfig.FILE_READER_TYPE(), systemName)));
+ Assert.assertEquals("/home/output", generatedConfig.get(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName)));
+ Assert.assertEquals(AvroDataFileHdfsWriter.class.getName(),
+ generatedConfig.get(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName)));
+ }
+}