You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/03 05:17:18 UTC

samza git commit: samza documentation: hdfs and eventhubs connector

Repository: samza
Updated Branches:
  refs/heads/master 80f5f388f -> ad578e241


samza documentation: hdfs and eventhubs connector

Author: Hai Lu <ha...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #685 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad578e24
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad578e24
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad578e24

Branch: refs/heads/master
Commit: ad578e24149f7d94465557dec852d9c36d9ee3b8
Parents: 80f5f38
Author: Hai Lu <ha...@linkedin.com>
Authored: Tue Oct 2 22:17:15 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Oct 2 22:17:15 2018 -0700

----------------------------------------------------------------------
 .../versioned/connectors/eventhubs.md           |  95 ++++++++++++-
 .../documentation/versioned/connectors/hdfs.md  | 133 ++++++++++++++++++-
 docs/learn/documentation/versioned/index.html   |   2 +-
 3 files changed, 225 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/connectors/eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/eventhubs.md b/docs/learn/documentation/versioned/connectors/eventhubs.md
index b99b46d..0f8766b 100644
--- a/docs/learn/documentation/versioned/connectors/eventhubs.md
+++ b/docs/learn/documentation/versioned/connectors/eventhubs.md
@@ -19,6 +19,97 @@ title: Eventhubs Connector
    limitations under the License.
 -->
 
-# Section 1
-# Section 2
+## Overview
 
+The Samza EventHubs connector provides access to [Azure Eventhubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft’s data streaming service on Azure. An event hub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data).
+
+## Consuming from EventHubs
+
+Samza’s [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). Samza's eventhubs consumer wraps each message from Eventhubs into an EventHubMessageEnvelope. The envelope has two fields of interest - the key, which is set to the event's partition key and the message, which is set to the actual data in the event.
+
+You can configure your Samza jobs to process data from Azure Eventhubs. To configure Samza to consume from EventHub streams:
+
+{% highlight jproperties %}
+# define an event hub system factory with your identifier. eg: eh-system
+systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+
+# define your streams
+systems.eh-system.stream.list=eh-input-stream
+streams.eh-stream.samza.system=eh-system
+
+# define required properties for your streams
+streams.eh-input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+streams.eh-input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+streams.eh-input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+streams.eh-input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+{% endhighlight %}
+
+It is required to provide values for YOUR-STREAM-NAMESPACE, YOUR-ENTITY-NAME, YOUR-SAS-KEY-NAME, YOUR-SAS-KEY-TOKEN to read or write to the stream.
+
+## Producing to EventHubs
+
+Similarly, you can also configure your Samza job to write to EventHubs. Follow the same configs defined in the Consuming from EventHubs section to write to EventHubs:
+
+{% highlight jproperties %}
+# define an event hub system factory with your identifier. eg: eh-system
+systems.eh-system.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+
+# define your streams
+systems.eh-system.stream.list=eh-output-stream
+streams.eh-stream.samza.system=eh-system
+
+streams.eh-output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+streams.eh-output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+streams.eh-output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+streams.eh-output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+{% endhighlight %}
+
+Then you can create and produce a message to eventhubs in your code as below:
+
+{% highlight java %}
+OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message); 
+collector.send(envelope);
+{% endhighlight %}
+
+Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the message in the envelope. Additionally, the key and the produce timestamp are set as properties in the EventData before sending it to EventHubs.
+
+## Advanced configuration
+
+###Producer partitioning
+
+The partition.method property determines how outgoing messages are partitioned. Valid values for this config are EVENT\_HUB\_HASHING, PARTITION\_KEY\_AS_PARTITION or ROUND\_ROBIN.
+
+1. EVENT\_HUB\_HASHING: By default, Samza computes the partition for an outgoing message based on the hash of its partition-key. This ensures that events with the same key are sent to the same partition. If this option is chosen, the partition key should be a string. If the partition key is not set, the key in the message is used for partitioning.
+
+2. PARTITION\_KEY\_AS\_PARTITION: In this method, each message is sent to the partition specified by its partition key. This requires the partition key to be an integer. If the key is greater than the number of partitions, a modulo operation will be performed on the key. Similar to EVENT\_HUB\_HASHING, the key in the message is used if the partition key is not specified.
+
+3. ROUND\_ROBIN: In this method, outgoing messages are distributed in a round-robin across all partitions. The key and the partition key in the message are ignored.
+
+{% highlight jproperties %}
+systems.eh-system.partition.method = EVENT_HUB_HASHING
+{% endhighlight %}
+
+### Consumer groups
+
+Eventhub supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the consumer group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job by configuring a eventhubs.consumer.group
+
+{% highlight jproperties %}
+streams.eh-input-stream.eventhubs.consumer.group = my-group
+{% endhighlight %}
+
+### Serde
+
+By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde for your stream.
+
+{% highlight jproperties %}
+streams.input0.samza.msg.serde = json
+streams.output0.samza.msg.serde = json
+{% endhighlight %}
+
+### Consumer buffer size
+
+When the consumer reads a message from event hubs, it appends them to a shared producer-consumer queue corresponding to its partition. This config determines the per-partition queue size. Setting a higher value for this config typically achieves a higher throughput at the expense of increased on-heap memory.
+
+{% highlight jproperties %}
+systems.eh-system.eventhubs.receive.queue.size = 10
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/connectors/hdfs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md b/docs/learn/documentation/versioned/connectors/hdfs.md
index a78c4aa..9692d18 100644
--- a/docs/learn/documentation/versioned/connectors/hdfs.md
+++ b/docs/learn/documentation/versioned/connectors/hdfs.md
@@ -19,6 +19,135 @@ title: HDFS Connector
    limitations under the License.
 -->
 
-# Section 1
-# Section 2
+## Overview
 
+Samza applications can read and process data stored in HDFS. Likewise, you can also write processed results to HDFS.
+
+### Environment Requirement
+
+Your job needs to run on the same YARN cluster which hosts the HDFS you want to consume from (or write into).
+
+## Consuming from HDFS
+
+You can configure your Samza job to read from HDFS files with the [HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java). Avro encoded records are supported out of the box and it is easy to extend to support other formats (plain text, csv, json etc). See Event Format section below.
+
+### Partitioning
+
+Partitioning works at the level of individual directories and files. Each directory is treated as its own stream, while each of its files is treated as a partition. For example, when reading from a directory on HDFS with 10 files, there will be 10 partitions created. This means that you can have up-to 10 containers to process them. If you want to read from a single HDFS file, there is currently no way to break down the consumption - you can only have one container to process the file.
+
+### Event format
+
+Samza's HDFS consumer wraps each avro record read from HDFS into a message-envelope. The [Envelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three fields of interest:
+
+1. The key, which is empty
+2. The message, which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
+3. The stream partition, which is set to the name of the HDFS file
+
+To support input formats which are not avro, you can implement the [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) interface (example: [AvroFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java))
+
+### End of stream support
+
+While streaming sources like Kafka are unbounded, files on HDFS have finite data and have a notion of end-of-file.
+
+When reading from HDFS, your Samza job automatically exits after consuming all the data. You can choose to implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to receive a callback when reaching end of stream. 
+
+### Basic Configuration
+
+Here is a few of the basic configs which are required to set up HdfsSystemConsumer:
+
+{% highlight jproperties %}
+# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package,
+# so use HdfsSystemFactory as the system factory for your system
+systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# Define the hdfs stream
+streams.hdfs-clickstream.samza.system=hdfs
+
+# You need to specify the path of files you want to consume
+streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
+
+# You can specify a white list of files you want your job to process (in Java Pattern style)
+systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
+
+# You can specify a black list of files you don't want your job to process (in Java Pattern style),
+# by default it's empty.
+# Note that you can have both white list and black list, in which case both will be applied.
+systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
+{% endhighlight %}
+
+### Security Configuration
+
+The following additional configs are required when accessing HDFS clusters that have kerberos enabled:
+
+{% highlight jproperties %}
+# When the job is running in a secure environment, use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens
+job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
+
+# Kerberos principal
+yarn.kerberos.principal=your-principal-name
+
+# Path of the keytab file (local path)
+yarn.kerberos.keytab=/tmp/keytab
+{% endhighlight %}
+
+### Advanced Configuration
+
+Some of the advanced configuration you might need to set up:
+
+{% highlight jproperties %}
+# Specify the group pattern for advanced partitioning.
+systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.*
+
+# Specify the type of files your job want to process (support avro only for now)
+systems.hdfs-clickstream.consumer.reader=avro
+
+# Max number of retries (per-partition) before the container fails.
+system.hdfs-clickstream.consumer.numMaxRetries=10
+{% endhighlight %}
+
+The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro] that you want to organize into three partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a “group identifier”, you can then set this property to be “part-[id]-.” (note that "[id]" is a reserved term here, i.e. you have to literally put it as [id]). The partitioner will apply this pattern to all file names and extract the “group identifier” (“[id]” in the pattern), then use the “group identifier” to group files into partitions.
+
+## Producing to HDFS
+
+The samza-hdfs module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use HdfsSystemProducer, and two HdfsWriters: One that writes messages of raw bytes to a SequenceFile of BytesWritable keys and values. Another writes out Avro data files including the schema automatically reflected from the POJO objects fed to it.
+
+### Configuring an HdfsSystemProducer
+
+You can configure an HdfsSystemProducer like any other Samza system: using configuration keys and values set in a job.properties file. You might configure the system producer for use by your StreamTasks like this:
+
+{% highlight jproperties %}
+# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'
+systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# Assign the implementation class for this system's HdfsWriter
+systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+#systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+# define a serializer/deserializer for the hdfs system
+# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema
+systems.hdfs.samza.msg.serde=some-serde-impl
+
+# Assign a serde implementation to be used for the stream called "metrics"
+systems.hdfs.streams.metrics.samza.msg.serde=some-metrics-impl
+
+# Set compression type supported by chosen Writer.
+# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none
+systems.hdfs.producer.hdfs.compression.type=snappy
+
+# The base dir for HDFS output. Output is structured into buckets. The default Bucketer for SequenceFile HdfsWriters
+# is currently /BASE/JOB_NAME/DATE_PATH/FILES, where BASE is set below
+systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
+
+# Assign the implementation class for the HdfsWriter's Bucketer
+systems.hdfs.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
+
+# Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run.
+systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+
+# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file.
+# A new file will be cut and output continued on the next write call each time this many bytes
+# (records for AvroDataFileHdfsWriter) have been written.
+systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
+#systems.hdfs.producer.hdfs.write.batch.size.records=10000
+{% endhighlight %}
+
+The above configuration assumes a Metrics and Serde implementation has been properly configured against the some-metrics-impl and some-serde-impl and labels somewhere else in the same job.properties file. Each of these properties has a reasonable default, so you can leave out the ones you don’t need to customize for your job run.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ad578e24/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index 80035bb..94f7e18 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -51,7 +51,7 @@ title: Documentation
 <ul class="documentation-list">
   <li><a href="connectors/overview.html">Connectors overview</a></li>
   <li><a href="connectors/kafka.html">Apache Kafka</a></li>
-  <li><a href="connectors/hdfs.html">Apache Hadoop</a></li>
+  <li><a href="connectors/hdfs.html">HDFS</a></li>
   <li><a href="connectors/eventhubs.html">Azure EventHubs</a></li>
   <li><a href="connectors/kinesis.html">AWS Kinesis</a></li>
 </ul>