You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/24 16:27:34 UTC

[GitHub] [samza-hello-samza] dengpanyin commented on a change in pull request #64: SAMZA-2218: Add a Couchbase example to samza-hello-samza

dengpanyin commented on a change in pull request #64: SAMZA-2218: Add a Couchbase example to samza-hello-samza
URL: https://github.com/apache/samza-hello-samza/pull/64#discussion_r287431784
 
 

 ##########
 File path: src/main/java/samza/examples/cookbook/CouchbaseTableExample.java
 ##########
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.cookbook;
+
+import com.couchbase.client.java.document.json.JsonObject;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.BaseTableFunction;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.couchbase.CouchbaseTableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+
+/**
+ * This is a simple word count example using a remote store.
+ *
+ * In this example, we use Couchbase to demonstrate how to invoke API's on a remote store other than get, put or delete
+ * as defined in {@link org.apache.samza.table.remote.AsyncRemoteTable}. Input messages are collected from user through
+ * a Kafka console producer, and tokenized using space. For each word, we increment a counter for this word
+ * as well as a counter for all words on Couchbase. We also output the current value of both counters to Kafka console
+ * consumer.
+ *
+ * A rate limit of 4 requests/second to Couchbase is set of the entire job, internally Samza uses an embedded
+ * rate limiter, which evenly distributes the total rate limit among tasks. As we invoke 2 calls on Couchbase
+ * for each word, you should see roughly 2 messages per second in the Kafka console consumer
+ * window.
+ *
+ * A retry policy with 1 second fixed backoff time and max 3 retries is attached to the remote table.
+ *
+ * <p> Concepts covered: remote table, rate limiter, retry, arbitrary operation on remote store.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Create a Couchbase instance using docker; Log into the admin UI at http://localhost:8091 (Administrator/password) <br/>
+ *     create a bucket called "my-bucket" <br/>
+ *     Under Security tab, create a user with the same name, set 123456 as the password, and give it "Data Reader"
+ *     and "Data Writer" privilege for this bucket. <br/>
+ *     More information can be found at https://docs.couchbase.com/server/current/getting-started/do-a-quick-install.html
+ *   </li>
+ *   <li>
+ *     Create Kafka topics "word-input" and "count-output" <br/>
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic word-input --partitions 2 --replication-factor 1
+ *     ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic count-output --partitions 2 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/couchbase-table-example.properties
+ *   </li>
+ *   <li>
+ *     Consume messages from the output topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic count-output
+ *   </li>
+ *   <li>
+ *     Produce some messages to the input topic <br/>
+ *     ./deploy/kafka/bin/kafka-console-producer.sh --topic word-input --broker-list localhost:9092
+ *
+ *     After the console producer is started, type
+ *     1
+ *     2
+ *     3
+ *     4
+ *     5
+ *     4
+ *     3
+ *     2
+ *     1
+ *
+ *     You should see messages like below from the console consumer window
+ *
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=2, count=1, total-count=1
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=1, count=1, total-count=2
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=4, count=1, total-count=3
+ *     2019-05-23 21:18:07 2019-05-23 21:18:07 word=3, count=1, total-count=4
+ *     2019-05-23 21:18:08 2019-05-23 21:18:08 word=4, count=2, total-count=5
+ *     2019-05-23 21:18:08 2019-05-23 21:18:08 word=5, count=1, total-count=6
+ *     2019-05-23 21:18:09 2019-05-23 21:18:09 word=2, count=2, total-count=7
+ *     2019-05-23 21:18:09 2019-05-23 21:18:09 word=3, count=2, total-count=8
+ *     2019-05-23 21:18:10 2019-05-23 21:18:10 word=1, count=2, total-count=9
+ *
+ *     You can examine the result on Couchbase Admin GUI as well.
+ *
+ *     Note:
+ *       - If you enter "1 2 3 4 5 4 3 2 1", you should see roughly 1 QPS as
+ *         the input is processed by only one task
+ *
+ *
+ *   </li>
+ * </ol>
+ *
+ */
+public class CouchbaseTableExample implements StreamApplication {
+
+  private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  private static final String KAFKA_SYSTEM_NAME = "kafka";
+  private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+  private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+  private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+  private static final String INPUT_STREAM_ID = "word-input";
+  private static final String OUTPUT_STREAM_ID = "count-output";
+
+  private static final String CLUSTER_NODES = "couchbase://127.0.0.1";
+  private static final int COUCHBASE_PORT = 11210;
+  private static final String BUCKET_NAME = "my-bucket";
+  private static final String BUCKET_PASSWORD = "123456";
+  private static final String TOTAL_COUNT_ID = "total-count";
+
+  @Override
+  public void describe(StreamApplicationDescriptor app) {
+
+    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
+        .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+        .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+        .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+    KafkaInputDescriptor<String> wordInputDescriptor =
+        kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde());
+
+    KafkaOutputDescriptor<String> countOutputDescriptor =
+        kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new StringSerde());
+
+    MyCouchbaseTableWriteFunction writeFn = new MyCouchbaseTableWriteFunction(BUCKET_NAME, CLUSTER_NODES)
+        .withBootstrapCarrierDirectPort(COUCHBASE_PORT)
+        .withUsernameAndPassword(BUCKET_NAME, BUCKET_PASSWORD)
+        .withTimeout(Duration.ofSeconds(5));
+
+    TableRetryPolicy retryPolicy = new TableRetryPolicy()
+        .withFixedBackoff(Duration.ofSeconds(1))
+        .withStopAfterAttempts(3);
+
+    RemoteTableDescriptor couchbaseTableDescriptor = new RemoteTableDescriptor("couchbase-table")
+        .withReadFunction(new DummyTableReadFunction())
 
 Review comment:
   Is there any reason we don't allow a write only table descriptor? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services