You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/23 17:28:52 UTC

samza-hello-samza git commit: Adding samza sql application

Repository: samza-hello-samza
Updated Branches:
  refs/heads/master f48892747 -> 428f61393


Adding samza sql application

Samza SQL application.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Aditya Toomula <at...@linkedin.com>

Closes #41 from srinipunuru/sql.1


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/428f6139
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/428f6139
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/428f6139

Branch: refs/heads/master
Commit: 428f6139338a5967e90803b4bd2ac58883efef4e
Parents: f488927
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Oct 23 10:28:48 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Oct 23 10:28:48 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |  2 +
 src/main/config/pageview-filter-sql.properties  | 51 ++++++++++++++
 .../samza/examples/avro/AvroSerDeFactory.java   | 73 ++++++++++++++++++++
 .../examples/cookbook/PageViewFilterSqlApp.java | 72 +++++++++++++++++++
 src/main/schemas/OutputTopic.avsc               | 39 +++++++++++
 src/main/schemas/PageViewStream.avsc            | 32 +++++++++
 6 files changed, 269 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 21793c4..cce7699 100644
--- a/build.gradle
+++ b/build.gradle
@@ -49,11 +49,13 @@ dependencies {
     compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION")
+    compile(group: 'org.apache.samza', name: 'samza-sql', version: "$SAMZA_VERSION")
 
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
 
     runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+
     runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
     runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/config/pageview-filter-sql.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter-sql.properties b/src/main/config/pageview-filter-sql.properties
new file mode 100644
index 0000000..49a4271
--- /dev/null
+++ b/src/main/config/pageview-filter-sql.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterSqlApp
+app.runner.class=org.apache.samza.sql.runner.SamzaSqlApplicationRunner
+
+# Avro schema files used in the sql command.
+schema.files=file://${basedir}/src/main/schemas/OutputTopic.avsc,file://${basedir}/src/main/schemas/PageViewStream.avsc
+
+# Samza sql configs
+samza.sql.stmt=insert into kafka.ouputTopic select id, Name from PageViewStream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=avro
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=1

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/avro/AvroSerDeFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..c96eedd
--- /dev/null
+++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
@@ -0,0 +1,73 @@
+package samza.examples.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class AvroSerDeFactory implements SerdeFactory {
+
+  public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+  @Override
+  public Serde getSerde(String name, Config config) {
+    return new AvroSerDe(config);
+  }
+
+  private class AvroSerDe implements Serde {
+    private final Schema schema;
+
+    public AvroSerDe(Config config) {
+      schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+    }
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+      GenericRecord record;
+      try {
+        record = genericRecordFromBytes(bytes, schema);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to deserialize the record", e);
+      }
+      return record;
+    }
+
+    @Override
+    public byte[] toBytes(Object o) {
+      GenericRecord record = (GenericRecord) o;
+      try {
+        return encodeAvroGenericRecord(schema, record);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to serialize the record", e);
+      }
+    }
+  }
+
+  public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException {
+    DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+    msgDatumWriter.write(record, encoder);
+    encoder.flush();
+    return os.toByteArray();
+  }
+
+  private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+    BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+    GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+    return reader.read(null, binDecoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
new file mode 100644
index 0000000..de01969
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
@@ -0,0 +1,72 @@
+package samza.examples.cookbook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.runner.SamzaSqlApplication;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+
+
+/**
+ * In this example, we demonstrate how to use SQL to create a samza job.
+ *
+ * <p>Concepts covered: Using sql to perform Stream processing.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ *   <li>
+ *     Ensure that the topic "PageViewStream" is created  <br/>
+ *     ./kafka-topics.sh  --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1
+ *   </li>
+ *   <li>
+ *     Run the application using the ./bin/run-app.sh script <br/>
+ *     ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ *     --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties)
+ *   </li>
+ *   <li>
+ *     Produce some messages to the "PageViewStream" topic <br/>
+ *     Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/>
+ *   </li>
+ *   <li>
+ *     Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ *     ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/>
+ *     --property print.key=true    </li>
+ * </ol>
+ *
+ */
+
+public class PageViewFilterSqlApp extends SamzaSqlApplication {
+
+  public static final String CFG_SCHEMA_FILES = "schema.files";
+  private static final String CFG_SCHEMA_VALUE_FMT = "";
+
+  @Override
+  public void init(StreamGraph streamGraph, Config config) {
+    String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream";
+    String schemaFiles = config.get(CFG_SCHEMA_FILES);
+    HashMap<String, String> newConfig = new HashMap<>();
+    newConfig.putAll(config);
+    populateSchemaConfigs(schemaFiles, newConfig);
+    newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt);
+    super.init(streamGraph, new MapConfig(newConfig));
+  }
+
+  private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) {
+    String[] schemaFiles = schemaFilesValue.split(",");
+    for (String schemaFileValue : schemaFiles) {
+      try {
+        File schemaFile = new File(schemaFileValue);
+        String schemaValue = Schema.parse(schemaFile).toString();
+        config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue);
+      } catch (IOException e) {
+        throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/OutputTopic.avsc
----------------------------------------------------------------------
diff --git a/src/main/schemas/OutputTopic.avsc b/src/main/schemas/OutputTopic.avsc
new file mode 100644
index 0000000..7670b1b
--- /dev/null
+++ b/src/main/schemas/OutputTopic.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+  "name": "SimpleRecord",
+  "version" : 1,
+  "namespace": "org.apache.samza.sql.system.avro",
+  "type": "record",
+  "fields": [
+    {
+      "name": "id",
+      "doc": "Record id.",
+      "type": ["null", "int"],
+      "default":null
+    },
+    {
+      "name": "Name",
+      "doc" : "Some name.",
+      "type": ["null", "string"],
+      "default":null
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/PageViewStream.avsc
----------------------------------------------------------------------
diff --git a/src/main/schemas/PageViewStream.avsc b/src/main/schemas/PageViewStream.avsc
new file mode 100644
index 0000000..54936f7
--- /dev/null
+++ b/src/main/schemas/PageViewStream.avsc
@@ -0,0 +1,32 @@
+{
+  "name": "PageViewEvent",
+  "version" : 1,
+  "namespace": "com.linkedin.samza.tools.avro",
+  "type": "record",
+  "fields": [
+    {
+      "name": "id",
+      "doc": "Record id.",
+      "type": ["null", "int"],
+      "default":null
+    },
+    {
+      "name": "Name",
+      "doc": "Name of the profile.",
+      "type": ["null", "string"],
+      "default":null
+    },
+    {
+      "name": "ViewerName",
+      "doc": "Name of the person who viewed the profile.",
+      "type": ["null", "string"],
+      "default":null
+    },
+    {
+      "name": "ProfileViewTimestamp",
+      "doc": "Time at which the profile was viewed.",
+      "type": ["null", "long"],
+      "default":null
+    }
+  ]
+}