You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/23 19:16:07 UTC

samza-hello-samza git commit: Update Event Hubs Example

Repository: samza-hello-samza
Updated Branches:
  refs/heads/latest 4a73e0d96 -> a096c7533


Update Event Hubs Example

Update the Event Hubs Connector example for 1.0 release.

Author: Daniel Chen <dc...@linkedin.com>

Reviewers: Prateek Maheshwari

Closes #38 from dxichen/latest


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/a096c753
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/a096c753
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/a096c753

Branch: refs/heads/latest
Commit: a096c7533987aa34128d4c61d6ad5b2da05b8039
Parents: 4a73e0d
Author: Daniel Chen <dc...@linkedin.com>
Authored: Tue Oct 23 12:15:58 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 23 12:15:58 2018 -0700

----------------------------------------------------------------------
 bin/run-azure-application.sh                    | 30 -----------
 bin/run-event-hubs-zk-application.sh            | 30 +++++++++++
 src/main/assembly/src.xml                       |  2 +-
 .../azure-application-local-runner.properties   | 26 ++--------
 .../samza/examples/azure/AzureApplication.java  | 53 ++++++++++++++------
 5 files changed, 71 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-azure-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh
deleted file mode 100755
index 8cd2463..0000000
--- a/bin/run-azure-application.sh
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-home_dir=`pwd`
-base_dir=$(dirname $0)/..
-cd $base_dir
-base_dir=`pwd`
-cd $home_dir
-
-export EXECUTION_PLAN_DIR="$base_dir/plan"
-mkdir -p $EXECUTION_PLAN_DIR
-
-[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-
-exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-event-hubs-zk-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-event-hubs-zk-application.sh b/bin/run-event-hubs-zk-application.sh
new file mode 100755
index 0000000..8cd2463
--- /dev/null
+++ b/bin/run-event-hubs-zk-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f3694e..1614aaf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -49,7 +49,7 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
-      <source>${basedir}/bin/run-azure-application.sh</source>
+      <source>${basedir}/bin/run-event-hubs-zk-application.sh</source>
       <outputDirectory>bin</outputDirectory>
     </file>
   </files>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/config/azure-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
index e440fd8..3b7618e 100644
--- a/src/main/config/azure-application-local-runner.properties
+++ b/src/main/config/azure-application-local-runner.properties
@@ -21,29 +21,9 @@ job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
 job.default.system=eventhubs
 job.coordinator.zk.connect=localhost:2181
 
-# Azure EventHubs System
-systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
-systems.eventhubs.stream.list=output-stream,input-stream
-
-# Add your EventHubs input stream credentials here
-systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Add your EventHubs output stream credentials here
-systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Azure Table Checkpoint Manager
-task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
-azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+# Define the key and name configurations with property names of your choice, starting with 'sensitive.'
+sensitive.eventhubs.sas.key.name=my-sas-key-name
+sensitive.eventhubs.sas.token=my-sas-token
 
 # Task/Application
 task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
-
-# Streams
-streams.input-stream.samza.system=eventhubs
-streams.output-stream.samza.system=eventhubs

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index e2c337f..454787f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -24,39 +24,60 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.serializers.ByteSerde;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.eventhub.descriptors.EventHubsInputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsOutputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsSystemDescriptor;
+
 
 public class AzureApplication implements StreamApplication {
+  // Stream names
   private static final String INPUT_STREAM_ID = "input-stream";
   private static final String OUTPUT_STREAM_ID = "output-stream";
 
+  // These properties could be configured here or in azure-application-local-runner.properties
+  // Keep in mind that the .properties file will be overwrite properties defined here with Descriptors
+  private static final String EVENTHUBS_NAMESPACE = "my-eventhubs-namespace";
+
+  // Upstream and downstream Event Hubs entity names
+  private static final String EVENTHUBS_INPUT_ENTITY = "my-input-entity";
+  private static final String EVENTHUBS_OUTPUT_ENTITY = "my-output-entity";
+
+  // You may define your own config properties in azure-application-local-runner.properties and retrieve them
+  // in the StreamApplicationDescriptor. Prefix them with 'sensitive.' to avoid logging them.
+  private static final String EVENTHUBS_SAS_KEY_NAME_CONFIG = "sensitive.eventhubs.sas.key.name";
+  private static final String EVENTHUBS_SAS_KEY_TOKEN_CONFIG = "sensitive.eventhubs.sas.token";
+
   @Override
   public void describe(StreamApplicationDescriptor appDescriptor) {
-    GenericSystemDescriptor systemDescriptor =
-        new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
+    // Define your system here
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
 
-    KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde());
+    // Choose your serializer/deserializer for the EventData payload
+    StringSerde serde = new StringSerde();
 
-    GenericInputDescriptor<KV<String, byte[]>> inputDescriptor =
-        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
+    // Define the input and output descriptors with respective configs
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
+            .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+            .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
 
-    GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
-        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+        systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
+            .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+            .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
 
-    MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
-    OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+    // Define the input and output streams with descriptors
+    MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
+    OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
 
+    // Define the execution flow with the high-level API
     eventhubInput
-        .filter((message) -> message.getKey() != null)
         .map((message) -> {
           System.out.println("Sending: ");
           System.out.println("Received Key: " + message.getKey());
-          System.out.println("Received Message: " + new String(message.getValue()));
+          System.out.println("Received Message: " + message.getValue());
           return message;
         })
         .sendTo(eventhubOutput);