You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/09 01:26:09 UTC

[28/33] samza-hello-samza git commit: SAMZA-1524: Azure checkpoint and eventhubs fluent api standalone example

SAMZA-1524: Azure checkpoint and eventhubs fluent api standalone example

Tutorial and docs coming soon

Author: Daniel Chen <29...@users.noreply.github.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #29 from dxichen/azure-examples


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f48c7f9f
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f48c7f9f
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f48c7f9f

Branch: refs/heads/master
Commit: f48c7f9f3419e69821b9685dcffc3fd89803f8fa
Parents: e7811db
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Mon Dec 11 17:56:18 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Dec 11 17:56:18 2017 -0800

----------------------------------------------------------------------
 bin/run-azure-application.sh                    | 30 ++++++++++
 pom.xml                                         |  5 ++
 src/main/assembly/src.xml                       |  4 ++
 .../azure-application-local-runner.properties   | 49 ++++++++++++++++
 .../samza/examples/azure/AzureApplication.java  | 61 ++++++++++++++++++++
 .../examples/azure/AzureZKLocalApplication.java | 42 ++++++++++++++
 6 files changed, 191 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/bin/run-azure-application.sh
----------------------------------------------------------------------
diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh
new file mode 100755
index 0000000..8cd2463
--- /dev/null
+++ b/bin/run-azure-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ada4c2b..5b2eb55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,11 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.samza</groupId>
+      <artifactId>samza-azure</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samza</groupId>
       <artifactId>samza-core_2.11</artifactId>
       <version>${samza.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml
index c04ace0..8f3694e 100644
--- a/src/main/assembly/src.xml
+++ b/src/main/assembly/src.xml
@@ -48,6 +48,10 @@
       <source>${basedir}/bin/run-wikipedia-zk-application.sh</source>
       <outputDirectory>bin</outputDirectory>
     </file>
+    <file>
+      <source>${basedir}/bin/run-azure-application.sh</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
   </files>
   <dependencySets>
     <dependencySet>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/config/azure-application-local-runner.properties
----------------------------------------------------------------------
diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties
new file mode 100644
index 0000000..e440fd8
--- /dev/null
+++ b/src/main/config/azure-application-local-runner.properties
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=azure-application-local-runner
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=eventhubs
+job.coordinator.zk.connect=localhost:2181
+
+# Azure EventHubs System
+systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+systems.eventhubs.stream.list=output-stream,input-stream
+
+# Add your EventHubs input stream credentials here
+systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Add your EventHubs output stream credentials here
+systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
+systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
+systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
+
+# Azure Table Checkpoint Manager
+task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory
+azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Streams
+streams.input-stream.samza.system=eventhubs
+streams.output-stream.samza.system=eventhubs

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
new file mode 100644
index 0000000..9f565fe
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+
+public class AzureApplication implements StreamApplication {
+
+  // Inputs
+  private static final String INPUT_STREAM_ID = "input-stream";
+
+  // Outputs
+  private static final String OUTPUT_STREAM_ID = "output-stream";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    // Input
+    MessageStream<KV<String, byte[]>> eventhubInput = graph.getInputStream(INPUT_STREAM_ID);
+
+    // Output
+    OutputStream<KV<String, byte[]>> eventhubOutput =
+        graph.getOutputStream(OUTPUT_STREAM_ID, KVSerde.of(new StringSerde(), new ByteSerde()));
+
+    // Send
+    eventhubInput
+        .filter((message) -> message.getKey() != null)
+        .map((message) -> {
+          System.out.println("Sending: ");
+          System.out.println("Received Key: " + message.getKey());
+          System.out.println("Received Message: " + new String(message.getValue()));
+          return message;
+        })
+        .sendTo(eventhubOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
new file mode 100644
index 0000000..3d4f8b0
--- /dev/null
+++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.azure;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+import samza.examples.azure.AzureApplication;
+
+public class AzureZKLocalApplication {
+
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    OptionSet options = cmdLine.parser().parse(args);
+    Config config = cmdLine.loadConfig(options);
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    AzureApplication app = new AzureApplication();
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+}