You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/23 17:28:52 UTC
samza-hello-samza git commit: Adding samza sql application
Repository: samza-hello-samza
Updated Branches:
refs/heads/master f48892747 -> 428f61393
Adding samza sql application
Samza SQL application.
Author: Srinivasulu Punuru <sp...@linkedin.com>
Reviewers: Aditya Toomula <at...@linkedin.com>
Closes #41 from srinipunuru/sql.1
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/428f6139
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/428f6139
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/428f6139
Branch: refs/heads/master
Commit: 428f6139338a5967e90803b4bd2ac58883efef4e
Parents: f488927
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Oct 23 10:28:48 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Oct 23 10:28:48 2018 -0700
----------------------------------------------------------------------
build.gradle | 2 +
src/main/config/pageview-filter-sql.properties | 51 ++++++++++++++
.../samza/examples/avro/AvroSerDeFactory.java | 73 ++++++++++++++++++++
.../examples/cookbook/PageViewFilterSqlApp.java | 72 +++++++++++++++++++
src/main/schemas/OutputTopic.avsc | 39 +++++++++++
src/main/schemas/PageViewStream.avsc | 32 +++++++++
6 files changed, 269 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 21793c4..cce7699 100644
--- a/build.gradle
+++ b/build.gradle
@@ -49,11 +49,13 @@ dependencies {
compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION")
+ compile(group: 'org.apache.samza', name: 'samza-sql', version: "$SAMZA_VERSION")
explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
+
runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION")
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/config/pageview-filter-sql.properties
----------------------------------------------------------------------
diff --git a/src/main/config/pageview-filter-sql.properties b/src/main/config/pageview-filter-sql.properties
new file mode 100644
index 0000000..49a4271
--- /dev/null
+++ b/src/main/config/pageview-filter-sql.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=pageview-filter
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+
+# Task
+app.class=samza.examples.cookbook.PageViewFilterSqlApp
+app.runner.class=org.apache.samza.sql.runner.SamzaSqlApplicationRunner
+
+# Avro schema files used in the sql command.
+schema.files=file://${basedir}/src/main/schemas/OutputTopic.avsc,file://${basedir}/src/main/schemas/PageViewStream.avsc
+
+# Samza sql configs
+samza.sql.stmt=insert into kafka.ouputTopic select id, Name from PageViewStream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=avro
+systems.kafka.samza.key.serde=string
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
+job.default.system=kafka
+job.container.count=1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/avro/AvroSerDeFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/avro/AvroSerDeFactory.java b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
new file mode 100644
index 0000000..c96eedd
--- /dev/null
+++ b/src/main/java/samza/examples/avro/AvroSerDeFactory.java
@@ -0,0 +1,73 @@
+package samza.examples.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class AvroSerDeFactory implements SerdeFactory {
+
+ public static String CFG_AVRO_SCHEMA = "serializers.avro.schema";
+
+ @Override
+ public Serde getSerde(String name, Config config) {
+ return new AvroSerDe(config);
+ }
+
+ private class AvroSerDe implements Serde {
+ private final Schema schema;
+
+ public AvroSerDe(Config config) {
+ schema = Schema.parse(config.get(CFG_AVRO_SCHEMA));
+ }
+
+ @Override
+ public Object fromBytes(byte[] bytes) {
+ GenericRecord record;
+ try {
+ record = genericRecordFromBytes(bytes, schema);
+ } catch (IOException e) {
+ throw new SamzaException("Unable to deserialize the record", e);
+ }
+ return record;
+ }
+
+ @Override
+ public byte[] toBytes(Object o) {
+ GenericRecord record = (GenericRecord) o;
+ try {
+ return encodeAvroGenericRecord(schema, record);
+ } catch (IOException e) {
+ throw new SamzaException("Unable to serialize the record", e);
+ }
+ }
+ }
+
+ public byte[] encodeAvroGenericRecord(Schema schema, GenericRecord record) throws IOException {
+ DatumWriter<IndexedRecord> msgDatumWriter = new GenericDatumWriter<>(schema);
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
+ msgDatumWriter.write(record, encoder);
+ encoder.flush();
+ return os.toByteArray();
+ }
+
+ private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException {
+ BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+ GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
+ return reader.read(null, binDecoder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
new file mode 100644
index 0000000..de01969
--- /dev/null
+++ b/src/main/java/samza/examples/cookbook/PageViewFilterSqlApp.java
@@ -0,0 +1,72 @@
+package samza.examples.cookbook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.sql.runner.SamzaSqlApplication;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+
+
+/**
+ * In this example, we demonstrate how to use SQL to create a samza job.
+ *
+ * <p>Concepts covered: Using sql to perform Stream processing.
+ *
+ * To run the below example:
+ *
+ * <ol>
+ * <li>
+ * Ensure that the topic "PageViewStream" is created <br/>
+ * ./kafka-topics.sh --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1
+ * </li>
+ * <li>
+ * Run the application using the ./bin/run-app.sh script <br/>
+ * ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
+ * --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties)
+ * </li>
+ * <li>
+ * Produce some messages to the "PageViewStream" topic <br/>
+ * Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/>
+ * </li>
+ * <li>
+ * Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
+ * ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/>
+ * --property print.key=true </li>
+ * </ol>
+ *
+ */
+
+public class PageViewFilterSqlApp extends SamzaSqlApplication {
+
+ public static final String CFG_SCHEMA_FILES = "schema.files";
+ private static final String CFG_SCHEMA_VALUE_FMT = "";
+
+ @Override
+ public void init(StreamGraph streamGraph, Config config) {
+ String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream";
+ String schemaFiles = config.get(CFG_SCHEMA_FILES);
+ HashMap<String, String> newConfig = new HashMap<>();
+ newConfig.putAll(config);
+ populateSchemaConfigs(schemaFiles, newConfig);
+ newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt);
+ super.init(streamGraph, new MapConfig(newConfig));
+ }
+
+ private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) {
+ String[] schemaFiles = schemaFilesValue.split(",");
+ for (String schemaFileValue : schemaFiles) {
+ try {
+ File schemaFile = new File(schemaFileValue);
+ String schemaValue = Schema.parse(schemaFile).toString();
+ config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue);
+ } catch (IOException e) {
+ throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/OutputTopic.avsc
----------------------------------------------------------------------
diff --git a/src/main/schemas/OutputTopic.avsc b/src/main/schemas/OutputTopic.avsc
new file mode 100644
index 0000000..7670b1b
--- /dev/null
+++ b/src/main/schemas/OutputTopic.avsc
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+ "name": "SimpleRecord",
+ "version" : 1,
+ "namespace": "org.apache.samza.sql.system.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "doc": "Record id.",
+ "type": ["null", "int"],
+ "default":null
+ },
+ {
+ "name": "Name",
+ "doc" : "Some name.",
+ "type": ["null", "string"],
+ "default":null
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/428f6139/src/main/schemas/PageViewStream.avsc
----------------------------------------------------------------------
diff --git a/src/main/schemas/PageViewStream.avsc b/src/main/schemas/PageViewStream.avsc
new file mode 100644
index 0000000..54936f7
--- /dev/null
+++ b/src/main/schemas/PageViewStream.avsc
@@ -0,0 +1,32 @@
+{
+ "name": "PageViewEvent",
+ "version" : 1,
+ "namespace": "com.linkedin.samza.tools.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "doc": "Record id.",
+ "type": ["null", "int"],
+ "default":null
+ },
+ {
+ "name": "Name",
+ "doc": "Name of the profile.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "ViewerName",
+ "doc": "Name of the person who viewed the profile.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "ProfileViewTimestamp",
+ "doc": "Time at which the profile was viewed.",
+ "type": ["null", "long"],
+ "default":null
+ }
+ ]
+}