You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:09 UTC
[28/33] samza-hello-samza git commit: SAMZA-1524: Azure checkpoint
and eventhubs fluent api standalone example
SAMZA-1524: Azure checkpoint and eventhubs fluent api standalone example
Tutorial and docs coming soon
Author: Daniel Chen <29...@users.noreply.github.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #29 from dxichen/azure-examples
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f48c7f9f
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f48c7f9f
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f48c7f9f
Branch: refs/heads/master
Commit: f48c7f9f3419e69821b9685dcffc3fd89803f8fa
Parents: e7811db
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Mon Dec 11 17:56:18 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 11 17:56:18 2017 -0800
----------------------------------------------------------------------
bin/run-azure-application.sh | 30 ++++++++++
pom.xml | 5 ++
src/main/assembly/src.xml | 4 ++
.../azure-application-local-runner.properties | 49 ++++++++++++++++
.../samza/examples/azure/AzureApplication.java | 61 ++++++++++++++++++++
.../examples/azure/AzureZKLocalApplication.java | 42 ++++++++++++++
6 files changed, 191 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/bin/run-azure-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh
new file mode 100755
index 0000000..8cd2463
--- /dev/null
+++ b/bin/run-azure-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ada4c2b..5b2eb55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
+ <artifactId>samza-azure</artifactId>
+ <version>${samza.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index c04ace0..8f3694e 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -48,6 +48,10 @@
<source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
<outputDirectory>bin</outputDirectory>
</file>
+ <file>
+ <source>${basedir}/bin/run-azure-application.sh</source>
+ <outputDirectory>bin</outputDirectory>
+ </file>
</files>
<dependencySets>
<dependencySet>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/config/azure-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
new file mode 100644
index 0000000..e440fd8
--- /dev/null
+++ b/src/main/config/azure-application-local-runner.properties
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=azure-application-local-runner
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=eventhubs
+job.coordinator.zk.connect=localhost:2181
+
+# Azure EventHubs System
+systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+systems.eventhubs.stream.list=output-stream,input-stream
+
+# Add your EventHubs input stream credentials here
+systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Add your EventHubs output stream credentials here
+systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Azure Table Checkpoint Manager
+task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
+azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Streams
+streams.input-stream.samza.system=eventhubs
+streams.output-stream.samza.system=eventhubs
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
new file mode 100644
index 0000000..9f565fe
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+
+public class AzureApplication implements StreamApplication {
+
+ // Inputs
+ private static final String INPUT_STREAM_ID = "input-stream";
+
+ // Outputs
+ private static final String OUTPUT_STREAM_ID = "output-stream";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ // Input
+ MessageStream<KV<String, byte[]>> eventhubInput = graph.getInputStream(INPUT_STREAM_ID);
+
+ // Output
+ OutputStream<KV<String, byte[]>> eventhubOutput =
+ graph.getOutputStream(OUTPUT_STREAM_ID, KVSerde.of(new StringSerde(), new ByteSerde()));
+
+ // Send
+ eventhubInput
+ .filter((message) -> message.getKey() != null)
+ .map((message) -> {
+ System.out.println("Sending: ");
+ System.out.println("Received Key: " + message.getKey());
+ System.out.println("Received Message: " + new String(message.getValue()));
+ return message;
+ })
+ .sendTo(eventhubOutput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
new file mode 100644
index 0000000..3d4f8b0
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import samza.examples.azure.AzureApplication;
+
+public class AzureZKLocalApplication {
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ AzureApplication app = new AzureApplication();
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+}