You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/03/18 23:25:44 UTC

[samza-hello-samza] branch latest updated: Sync latest branch with master (#76)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch latest
in repository https://gitbox.apache.org/repos/asf/samza-hello-samza.git


The following commit(s) were added to refs/heads/latest by this push:
     new a3a8fe1  Sync latest branch with master (#76)
a3a8fe1 is described below

commit a3a8fe1743729ffaab70c419e6682350f175448e
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Mar 18 16:25:33 2020 -0700

    Sync latest branch with master (#76)
---
 README.md                                          |  2 +-
 build.gradle                                       |  1 +
 conf/yarn-site.xml                                 |  4 ++
 pom.xml                                            | 12 +++--
 src/main/config/kinesis-hello-samza.properties     | 54 ++++++++++++++++++++++
 .../samza/examples/kinesis/KinesisHelloSamza.java  | 42 +++++++++++++++++
 .../sql/samza-sql-casewhen/src/main/sql/samza.sql  | 12 +++++
 .../sql/samza-sql-filter/src/main/sql/samza.sql    |  9 ++++
 .../main/java/samza/sql/PageViewGroupByOutput.json | 18 ++++++++
 .../sql/samza-sql-groupby/src/main/sql/samza.sql   | 12 +++++
 .../src/main/sql/samza.sql                         | 11 +++++
 11 files changed, 173 insertions(+), 4 deletions(-)

diff --git a/README.md b/README.md
index 3f2a7b5..ea5113a 100644
--- a/README.md
+++ b/README.md
@@ -67,7 +67,7 @@ Package [samza.examples.wikipedia.application](https://github.com/apache/samza-h
 Once the job is started, we can tail the kafka topic by:
 
 ```
-./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wikipedia-stats
+./deploy/kafka/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic wikipedia-stats
 ```
 
 A code walkthrough of this application can be found [here](http://samza.apache.org/learn/tutorials/latest/hello-samza-high-level-code.html).
diff --git a/build.gradle b/build.gradle
index 427409c..fa3a063 100644
--- a/build.gradle
+++ b/build.gradle
@@ -61,6 +61,7 @@ dependencies {
     compile(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION")
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
+    compile(group: 'org.apache.samza', name: 'samza-aws_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-azure_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
diff --git a/conf/yarn-site.xml b/conf/yarn-site.xml
index 9028590..6c48260 100644
--- a/conf/yarn-site.xml
+++ b/conf/yarn-site.xml
@@ -30,4 +30,8 @@ under the License.
     <name>yarn.resourcemanager.hostname</name>
     <value>127.0.0.1</value>
   </property>
+  <property>
+    <name>yarn.nodemanager.delete.debug-delay.sec</name>
+    <value>86400</value>
+  </property>
 </configuration>
diff --git a/pom.xml b/pom.xml
index 49f4eff..a118cd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,11 @@ under the License.
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws_2.11</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.schwering</groupId>
       <artifactId>irclib</artifactId>
       <version>1.10</version>
@@ -358,10 +363,11 @@ under the License.
           <version>0.9</version>
           <configuration>
             <excludes>
-              <exclude>*.patch</exclude>
-              <exclude>*.txt</exclude>
+              <exclude>**/*.patch</exclude>
+              <exclude>**/*.txt</exclude>
               <exclude>**/target/**</exclude>
-              <exclude>*.json</exclude>
+              <exclude>**/*.json</exclude>
+              <exclude>**/*.sql</exclude>
               <exclude>.vagrant/**</exclude>
               <exclude>.git/**</exclude>
               <exclude>*.md</exclude>
diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file
diff --git a/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
new file mode 100644
index 0000000..ca9f854
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- For each profile in Kafka Profile change capture stream, identify whether the
+-- profile is a quality profile or not and insert the result into QualityProfile
+-- kafka topic. Please note the usage of GetSqlField UDF to extract the company
+-- name field from nested record.
+
+INSERT INTO kafka.QualityProfile
+SELECT id, status, case when (profilePicture <> null and industryName <> null and
+GetSqlField(positions, 'Position.companyName') <> null)
+then 1 else 0 end as quality
+FROM kafka.ProfileChanges
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
new file mode 100644
index 0000000..9e7960c
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-filter/src/main/sql/samza.sql
@@ -0,0 +1,9 @@
+-- Filter Profile change-capture stream by 'Product Manager'
+-- title and project basic profile data to a kafka topic.
+
+INSERT INTO kafka.ProductManagerProfiles
+SELECT memberId, firstName, lastName, company
+FROM kafka.ProfileChanges
+WHERE standardize(title) = 'Product Manager'
+
+-- you can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
new file mode 100644
index 0000000..11680d1
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/java/samza/sql/PageViewGroupByOutput.json
@@ -0,0 +1,18 @@
+{
+    "name": "PageViewGroupByOutput",
+    "version" : 1,
+    "namespace": "org.apache.samza.sql.system.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name":"pageKey",
+            "doc":"The page key of the page being viewed.",
+            "type":["string","null"]
+        },
+        {
+            "name": "Views",
+            "doc" : "Number of views in 5 minute window.",
+            "type": ["long", "null"]
+        }
+    ]
+}
diff --git a/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
new file mode 100644
index 0000000..716bfaa
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-groupby/src/main/sql/samza.sql
@@ -0,0 +1,12 @@
+-- NOTE: Groupby Operator is currently not fully stable,
+--       we are actively working on stabilizing it.
+
+-- Emit Page view counts collected grouped by page key in the last
+-- 5 minutes at 5 minute interval and send the result to a kafka topic.
+-- Using GetSqlField UDF to extract page key from the requestHeader.
+insert into kafka.groupbyTopic
+  select GetSqlField(pv.requestHeader) as __key__, GetPageKey(pv.requestHeader) as pageKey, count(*) as Views
+  from kafka.`PageViewEvent` as pv
+  group by GetSqlField(pv.requestHeader)
+
+-- You can add additional SQL statements here
diff --git a/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
new file mode 100644
index 0000000..c1b9be0
--- /dev/null
+++ b/src/main/java/samza/examples/sql/samza-sql-stream-table-join/src/main/sql/samza.sql
@@ -0,0 +1,11 @@
+-- NOTE: Join Operator is currently not fully stable,
+--       we are actively working on stabilizing it.
+
+-- Enrich PageViewEvent with member profile data
+INSERT INTO kafka.tracking.EnrichedPageVIewEvent
+SELECT *
+FROM Kafka.PageViewEvent as pv
+  JOIN Kafka.ProfileChanges.`$table` as p
+    ON pv.memberid = p.memberid
+
+-- You can add additional SQL statements here