You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/11/14 03:40:47 UTC

[4/9] samza git commit: Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors

Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #778 from vjagadish1989/website-reorg26


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4b60e73
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4b60e73
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4b60e73

Branch: refs/heads/1.0.0
Commit: e4b60e732a063c50736599fb0c6e0cba31a6e45c
Parents: 93a2542
Author: Jagadish <jv...@linkedin.com>
Authored: Sun Oct 28 11:29:07 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 13 19:32:00 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/connectors/kafka.md | 168 +++++++++++--------
 .../versioned/connectors/overview.md            |  38 ++---
 2 files changed, 112 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e4b60e73/docs/learn/documentation/versioned/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kafka.md b/docs/learn/documentation/versioned/connectors/kafka.md
index b6e78e4..447bfdc 100644
--- a/docs/learn/documentation/versioned/connectors/kafka.md
+++ b/docs/learn/documentation/versioned/connectors/kafka.md
@@ -19,108 +19,132 @@ title: Kafka Connector
    limitations under the License.
 -->
 
-## Overview
-Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process it and emit results to other Kafka topics or external systems.
+### Kafka I/O : QuickStart
+Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases.
 
-## Consuming from Kafka
+The `hello-samza` project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results. 
 
-### <a name="kafka-basic-configuration"></a>Basic Configuration
+- [High-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn)
 
-The example below provides a basic example for configuring a system called `kafka-cluster-1` that uses the provided KafkaSystemFactory.
+- [Low-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza)
 
-{% highlight jproperties %}
-# Set the SystemFactory implementation to instantiate KafkaSystemConsumer, KafkaSystemProducer and KafkaSystemAdmin
-systems.kafka-cluster-1.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 
-# Define the default key and message SerDe.
-systems.kafka-cluster-1.default.stream.samza.key.serde=string
-systems.kafka-cluster-1.default.stream.samza.msg.serde=json
+### Concepts
 
-# Zookeeper nodes of the Kafka cluster
-systems.kafka-cluster-1.consumer.zookeeper.connect=localhost:2181
+####KafkaSystemDescriptor
 
-# List of network endpoints where Kafka brokers are running. Also needed by consumers for querying metadata.
-systems.kafka-cluster-1.producer.bootstrap.servers=localhost:9092,localhost:9093
+Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `KafkaSystemDescriptor` allows you to describe the Kafka cluster you are interacting with and specify its properties. 
+
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+            .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+            .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
 {% endhighlight %}
 
-Samza provides a built-in KafkaSystemDescriptor to consume from and produce to Kafka from the StreamApplication (High-level API) or the TaskApplication (Low-level API).
 
-Below is an example of how to use the descriptors in the describe method of a StreamApplication.
+####KafkaInputDescriptor
 
+A Kafka cluster usually has multiple topics (a.k.a _streams_). The `KafkaInputDescriptor` allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of `KafkaInputDescriptor`
+by providing a topic-name and a serializer.
 {% highlight java %}
-public class PageViewFilter implements StreamApplication {
-  @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
-    // add input and output streams
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
-    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor("myinput", new JsonSerdeV2<>(PageView.class));
-    KafkaOutputDescriptor<DecoratedPageView> osd = ksd.getOutputDescriptor("myout", new JsonSerdeV2<>(DecordatedPageView.class));
-
-    MessageStream<PageView> ms = appDesc.getInputStream(isd);
-    OutputStream<DecoratedPageView> os = appDesc.getOutputStream(osd);
-
-    ms.filter(this::isValidPageView)
-      .map(this::addProfileInformation)
-      .sendTo(os);
-  }
-}
+    KafkaInputDescriptor<PageView> pageViewStreamDescriptor = kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class));
 {% endhighlight %}
 
-Below is an example of how to use the descriptors in the describe method of a TaskApplication
+The above example describes an input Kafka stream from the "page-view-topic" which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.
+ 
+####KafkaOutputDescriptor
+
+Similarly, the `KafkaOutputDescriptor` allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of `KafkaOutputDescriptor`.
 
 {% highlight java %}
-public class PageViewFilterTask implements TaskApplication {
-  @Override
-  public void describe(TaskApplicationDescriptor appDesc) {
-    // add input and output streams
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
-    KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
-    KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
-
-    appDesc.addInputStream(isd);
-    appDesc.addOutputStream(osd);
-    appDesc.addTable(td);
-
-    appDesc.withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-  }
-}
+    KafkaOutputDescriptor<DecoratedPageView> decoratedPageView = kafkaSystemDescriptor.getOutputDescriptor("my-output-topic", new JsonSerdeV2<>(DecoratedPageView.class));
 {% endhighlight %}
 
-### Advanced Configuration
 
-Prefix the configuration with `systems.system-name.consumer.` followed by any of the Kafka consumer configurations. See [Kafka Consumer Configuration Documentation](http://kafka.apache.org/documentation.html#consumerconfigs)
+### Configuration
 
-{% highlight jproperties %}
-systems.kafka-cluster-1.consumer.security.protocol=SSL
-systems.kafka-cluster-1.consumer.max.partition.fetch.bytes=524288
+#####Configuring Kafka producer and consumer
+ 
+The `KafkaSystemDescriptor` allows you to specify any [Kafka producer](https://kafka.apache.org/documentation/#producerconfigs) or [Kafka consumer](https://kafka.apache.org/documentation/#consumerconfigs)) property which are directly passed over to the underlying Kafka client. This allows for 
+precise control over the KafkaProducer and KafkaConsumer used by Samza. 
+
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
+            .withProducerBootstrapServers(..)
+            .withConsumerConfigs(..)
+            .withProducerConfigs(..)
 {% endhighlight %}
 
-## Producing to Kafka
 
-### Basic Configuration
+####Accessing an offset which is out-of-range
+This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers. 
 
-The basic configuration is the same as [Consuming from Kafka](#kafka-basic-configuration).
+{% highlight java %}
+    KafkaSystemDescriptor kafkaSystemDescriptor =
+        new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
+            .withProducerBootstrapServers(..)
+            .withConsumerAutoOffsetReset("largest")
+{% endhighlight %}
 
-### Advanced Configuration
 
-#### Changelog to Kafka for State Stores
+#####Ignoring checkpointed offsets
+Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with `KafkaInputDescriptor#shouldResetOffset()`.
+Once there are no checkpoints for a stream, the `#withOffsetDefault(..)` determines whether we start consumption from the oldest or newest offset. 
 
-For Samza processors that have local state and is configured with a changelog for durability, if the changelog is configured to use Kafka, there are Kafka specific configuration parameters.
-See section on `TODO: link to state management section` State Management `\TODO` for more details.
+{% highlight java %}
+KafkaInputDescriptor<PageView> pageViewStreamDescriptor = 
+    kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class)) 
+        .shouldResetOffset()
+        .withOffsetDefault(OffsetType.OLDEST);
 
-{% highlight jproperties %}
-stores.store-name.changelog=kafka-cluster-2.changelog-topic-name
-stores.store-name.changelog.replication.factor=3
-stores.store-name.changelog.kafka.cleanup.policy=compact
 {% endhighlight %}
 
-#### Performance Tuning
+The above example configures Samza to ignore checkpointed offsets for `page-view-topic` and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using `KafkaSystemDescriptor#withDefaultStreamOffsetDefault`.
+
+ 
 
-Increasing the consumer fetch buffer thresholds may improve throughput at the expense of memory by buffering more messages. Run some performance analysis to find the optimal values.
+### Code walkthrough
+
+In this section, we walk through a complete example.
+
+#### High-level API
+{% highlight java %}
+// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor
+1    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
+2        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+3        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+
+// Create an KafkaInputDescriptor for your input topic and a KafkaOutputDescriptor for the output topic 
+4    KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class));
+5    KafkaInputDescriptor<KV<String, PageView>> inputDescriptor =
+6        kafkaSystemDescriptor.getInputDescriptor("page-views", serde);
+7    KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor =
+8        kafkaSystemDescriptor.getOutputDescriptor("filtered-page-views", serde);
+
+
+// Obtain a message stream the input topic
+9    MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor);
+
+// Obtain an output stream for the topic    
+10    OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor);
+
+// write results to the output topic
+11    pageViews
+12       .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
+13       .sendTo(filteredPageViews);
 
-{% highlight jproperties %}
-# Max number of messages to buffer across all Kafka input topic partitions per container. Default is 50000 messages.
-systems.kafka-cluster-1.samza.fetch.threshold=10000
-# Max buffer size by bytes. This configuration takes precedence over the above configuration if value is not -1. Default is -1.
-systems.kafka-cluster-1.samza.fetch.threshold.bytes=-1
 {% endhighlight %}
+
+- Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster
+
+- Lines 4-6 defines a KafkaInputDescriptor for our input topic - `page-views`
+
+- Lines 7-9 defines a KafkaOutputDescriptor for our output topic - `filtered-page-views`
+
+- Line 9 creates a MessageStream for the input topic so that you can chain operations on it later
+
+- Line 10 creates an OuputStream for the output topic
+
+- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e4b60e73/docs/learn/documentation/versioned/connectors/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/overview.md b/docs/learn/documentation/versioned/connectors/overview.md
index 5b6ba39..6697b9c 100644
--- a/docs/learn/documentation/versioned/connectors/overview.md
+++ b/docs/learn/documentation/versioned/connectors/overview.md
@@ -20,33 +20,27 @@ title: Connectors overview
 -->
 
 Stream processing applications often read data from external sources like Kafka or HDFS. Likewise, they require processed
-results to be written to external system or data stores. As of the 1.0 release, Samza integrates with the following systems
-out-of-the-box:
+results to be written to external system or data stores. Samza is pluggable and designed to support a variety of [producers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemProducer.html) and [consumers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemConsumer.html) for your data. You can 
+integrate Samza with any streaming system by implementing the [SystemFactory](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemFactory.html) interface. 
 
-- [Apache Kafka](kafka) (consumer/producer)
-- [Microsoft Azure Eventhubs](eventhubs) (consumer/producer)
-- [Amazon AWS Kinesis Streams](kinesis) (consumer)
-- [Hadoop Filesystem](hdfs) (consumer/producer)
-- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java) (producer)
+The following integrations are supported out-of-the-box:
 
-Instructions on how to use these connectors can be found in the corresponding subsections. Please note that the
-connector API is different from [Samza Table API](../api/table-api), where the data could be read from and written to
-data stores.
+Consumers:
 
-Samza is pluggable and designed to support a variety of producers and consumers. You can provide your own producer or
-consumer by implementing the SystemFactory interface.
+- [Apache Kafka](kafka) 
 
-To associate a system with a Samza Connector, the user needs to set the following config:
+- [Microsoft Azure Eventhubs](eventhubs) 
 
-{% highlight jproperties %}
-systems.<system-name>.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-{% endhighlight %}
+- [Amazon AWS Kinesis Streams](kinesis) 
 
-Any system specific configs, could be defined as below:
+- [Hadoop Filesystem](hdfs) 
 
-{% highlight jproperties %}
-systems.<system-name>.param1=value1
-systems.<system-name>.consumer.param2=value2
-systems.<system-name>.producer.param3=value3
-{% endhighlight %}
+Producers:
 
+- [Apache Kafka](kafka) 
+
+- [Microsoft Azure Eventhubs](eventhubs) 
+
+- [Hadoop Filesystem](hdfs) 
+
+- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java)