You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/23 19:16:07 UTC
samza-hello-samza git commit: Update Event Hubs Example
Repository: samza-hello-samza
Updated Branches:
refs/heads/latest 4a73e0d96 -> a096c7533
Update Event Hubs Example
Update the Event Hubs Connector example for 1.0 release.
Author: Daniel Chen <dc...@linkedin.com>
Reviewers: Prateek Maheshwari
Closes #38 from dxichen/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/a096c753
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/a096c753
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/a096c753
Branch: refs/heads/latest
Commit: a096c7533987aa34128d4c61d6ad5b2da05b8039
Parents: 4a73e0d
Author: Daniel Chen <dc...@linkedin.com>
Authored: Tue Oct 23 12:15:58 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 23 12:15:58 2018 -0700
----------------------------------------------------------------------
bin/run-azure-application.sh | 30 -----------
bin/run-event-hubs-zk-application.sh | 30 +++++++++++
src/main/assembly/src.xml | 2 +-
.../azure-application-local-runner.properties | 26 ++--------
.../samza/examples/azure/AzureApplication.java | 53 ++++++++++++++------
5 files changed, 71 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-azure-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh
deleted file mode 100755
index 8cd2463..0000000
--- a/bin/run-azure-application.sh
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-home_dir=`pwd`
-base_dir=$(dirname $0)/..
-cd $base_dir
-base_dir=`pwd`
-cd $home_dir
-
-export EXECUTION_PLAN_DIR="$base_dir/plan"
-mkdir -p $EXECUTION_PLAN_DIR
-
-[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-
-exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/bin/run-event-hubs-zk-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-event-hubs-zk-application.sh b/bin/run-event-hubs-zk-application.sh
new file mode 100755
index 0000000..8cd2463
--- /dev/null
+++ b/bin/run-event-hubs-zk-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index 8f3694e..1614aaf 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -49,7 +49,7 @@
<outputDirectory>bin</outputDirectory>
</file>
<file>
- <source>${basedir}/bin/run-azure-application.sh</source>
+ <source>${basedir}/bin/run-event-hubs-zk-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
</files>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/config/azure-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
index e440fd8..3b7618e 100644
--- a/src/main/config/azure-application-local-runner.properties
+++ b/src/main/config/azure-application-local-runner.properties
@@ -21,29 +21,9 @@ job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.default.system=eventhubs
job.coordinator.zk.connect=localhost:2181
-# Azure EventHubs System
-systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
-systems.eventhubs.stream.list=output-stream,input-stream
-
-# Add your EventHubs input stream credentials here
-systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Add your EventHubs output stream credentials here
-systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
-systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
-systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
-
-# Azure Table Checkpoint Manager
-task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
-azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+# Define the key and name configurations with property names of your choice, starting with 'sensitive.'
+sensitive.eventhubs.sas.key.name=my-sas-key-name
+sensitive.eventhubs.sas.token=my-sas-token
# Task/Application
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
-
-# Streams
-streams.input-stream.samza.system=eventhubs
-streams.output-stream.samza.system=eventhubs
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/a096c753/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index e2c337f..454787f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -24,39 +24,60 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.serializers.ByteSerde;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.eventhub.descriptors.EventHubsInputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsOutputDescriptor;
+import org.apache.samza.system.eventhub.descriptors.EventHubsSystemDescriptor;
+
public class AzureApplication implements StreamApplication {
+ // Stream names
private static final String INPUT_STREAM_ID = "input-stream";
private static final String OUTPUT_STREAM_ID = "output-stream";
+ // These properties could be configured here or in azure-application-local-runner.properties
+ // Keep in mind that the .properties file will be overwrite properties defined here with Descriptors
+ private static final String EVENTHUBS_NAMESPACE = "my-eventhubs-namespace";
+
+ // Upstream and downstream Event Hubs entity names
+ private static final String EVENTHUBS_INPUT_ENTITY = "my-input-entity";
+ private static final String EVENTHUBS_OUTPUT_ENTITY = "my-output-entity";
+
+ // You may define your own config properties in azure-application-local-runner.properties and retrieve them
+ // in the StreamApplicationDescriptor. Prefix them with 'sensitive.' to avoid logging them.
+ private static final String EVENTHUBS_SAS_KEY_NAME_CONFIG = "sensitive.eventhubs.sas.key.name";
+ private static final String EVENTHUBS_SAS_KEY_TOKEN_CONFIG = "sensitive.eventhubs.sas.token";
+
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
- GenericSystemDescriptor systemDescriptor =
- new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
+ // Define your system here
+ EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor("eventhubs");
- KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde());
+ // Choose your serializer/deserializer for the EventData payload
+ StringSerde serde = new StringSerde();
- GenericInputDescriptor<KV<String, byte[]>> inputDescriptor =
- systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde);
+ // Define the input and output descriptors with respective configs
+ EventHubsInputDescriptor<KV<String, String>> inputDescriptor =
+ systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_INPUT_ENTITY, serde)
+ .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+ .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
- GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
- systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
+ EventHubsOutputDescriptor<KV<String, String>> outputDescriptor =
+ systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, EVENTHUBS_NAMESPACE, EVENTHUBS_OUTPUT_ENTITY, serde)
+ .withSasKeyName(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_NAME_CONFIG))
+ .withSasKey(appDescriptor.getConfig().get(EVENTHUBS_SAS_KEY_TOKEN_CONFIG));
- MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
- OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+ // Define the input and output streams with descriptors
+ MessageStream<KV<String, String>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
+ OutputStream<KV<String, String>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
+ // Define the execution flow with the high-level API
eventhubInput
- .filter((message) -> message.getKey() != null)
.map((message) -> {
System.out.println("Sending: ");
System.out.println("Received Key: " + message.getKey());
- System.out.println("Received Message: " + new String(message.getValue()));
+ System.out.println("Received Message: " + message.getValue());
return message;
})
.sendTo(eventhubOutput);