You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:04 UTC

[18/20] storm git commit: [StormSQL] STORM-1357. Support writing to Kafka streams in Storm SQL.

[StormSQL] STORM-1357. Support writing to Kafka streams in Storm SQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5efe2c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5efe2c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5efe2c4

Branch: refs/heads/master
Commit: b5efe2c4433c0245c2fe0c1119e64baf6ecca776
Parents: b60712f
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 30 18:58:29 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/parser/SqlCreateTable.java | 14 +++++--
 .../test/org/apache/storm/sql/TestStormSql.java |  2 +-
 .../apache/storm/sql/kafka/JsonSerializer.java  |  6 +--
 .../sql/kafka/KafkaDataSourcesProvider.java     | 35 ++++++++++++----
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 17 ++++++--
 .../storm/sql/runtime/DataSourcesProvider.java  |  3 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |  4 +-
 .../jvm/storm/kafka/ByteBufferSerializer.java   | 41 +++++++++++++++++++
 .../src/jvm/storm/kafka/IntSerializer.java      | 42 ++++++++++++++++++++
 9 files changed, 140 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index 8fe4160..8ac52ed 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -113,13 +113,19 @@ public class SqlCreateTable extends SqlCall {
   }
 
   public String inputFormatClass() {
-    return inputFormatClass == null ? null : SqlLiteral.stringValue(
-        inputFormatClass);
+    return getString(inputFormatClass);
   }
 
   public String outputFormatClass() {
-    return outputFormatClass == null ? null : SqlLiteral.stringValue
-        (outputFormatClass);
+    return getString(outputFormatClass);
+  }
+
+  public String properties() {
+    return getString(properties);
+  }
+
+  private String getString(SqlNode n) {
+    return n == null ? null : SqlLiteral.stringValue(n);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index b238e18..add8da5 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -44,7 +44,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-         List<FieldInfo> fields) {
+                                                  String properties, List<FieldInfo> fields) {
       throw new UnsupportedOperationException();
     }
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
index 7c5aa57..e3d5d01 100644
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -20,14 +20,12 @@ package org.apache.storm.sql.kafka;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.lang.CharSet;
 import org.apache.storm.sql.runtime.IOutputSerializer;
 
 import java.io.IOException;
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 class JsonSerializer implements IOutputSerializer {
@@ -53,6 +51,6 @@ class JsonSerializer implements IOutputSerializer {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    return ByteBuffer.wrap(sw.toString().getBytes(Charsets.UTF_8));
+    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
index e57e4d3..7da57ba 100644
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -18,6 +18,7 @@
 package org.apache.storm.sql.kafka;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.storm.sql.runtime.*;
 import storm.kafka.ZkHosts;
@@ -33,13 +34,15 @@ import storm.trident.operation.TridentOperationContext;
 import storm.trident.spout.ITridentDataSource;
 import storm.trident.tuple.TridentTuple;
 
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 /**
- * Create a Kafka spout based on the URI. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic.
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
  */
 public class KafkaDataSourcesProvider implements DataSourcesProvider {
   private static final int DEFAULT_ZK_PORT = 2181;
@@ -80,11 +83,14 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     private transient TridentKafkaState state;
     private final String topic;
     private final int primaryKeyIndex;
+    private final Properties producerProperties;
     private final List<String> fieldNames;
 
-    private KafkaTridentSink(String topic, int primaryKeyIndex, List<String> fieldNames) {
+    private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
+                             List<String> fieldNames) {
       this.topic = topic;
       this.primaryKeyIndex = primaryKeyIndex;
+      this.producerProperties = producerProperties;
       this.fieldNames = fieldNames;
     }
 
@@ -100,6 +106,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
       state = new TridentKafkaState()
           .withKafkaTopicSelector(new StaticTopicSelector(topic))
           .withTridentTupleToKafkaMapper(m);
+      state.prepare(producerProperties);
     }
 
     @Override
@@ -113,11 +120,13 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     private final String topic;
     private final int primaryKeyIndex;
     private final List<String> fields;
+    private final Properties producerProperties;
     private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
-                                   List<String> fields) {
+                                   Properties producerProperties, List<String> fields) {
       this.conf = conf;
       this.topic = topic;
       this.primaryKeyIndex = primaryKeyIndex;
+      this.producerProperties = producerProperties;
       this.fields = fields;
     }
 
@@ -128,7 +137,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
 
     @Override
     public Function getConsumer() {
-      return new KafkaTridentSink(topic, primaryKeyIndex, fields);
+      return new KafkaTridentSink(topic, primaryKeyIndex, producerProperties, fields);
     }
   }
 
@@ -145,7 +154,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
 
   @Override
   public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                List<FieldInfo> fields) {
+                                                String properties, List<FieldInfo> fields) {
     int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
     ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
     Map<String, String> values = parseURIParams(uri.getQuery());
@@ -163,7 +172,19 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     }
     Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
     conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
-    return new KafkaTridentDataSource(conf, topic, primaryIndex, fieldNames);
+    ObjectMapper mapper = new ObjectMapper();
+    Properties producerProp = new Properties();
+    try {
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> map = mapper.readValue(properties, HashMap.class);
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
+      Preconditions.checkNotNull(producerConfig, "Kafka Table must contain producer config");
+      producerProp.putAll(producerConfig);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, producerProp, fieldNames);
   }
 
   private static Map<String, String> parseURIParams(String query) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
index 531f764..9380b66 100644
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -17,10 +17,11 @@
  */
 package org.apache.storm.sql.kafka;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
 import org.apache.storm.sql.runtime.DataSourcesRegistry;
 import org.apache.storm.sql.runtime.FieldInfo;
@@ -46,12 +47,20 @@ public class TestKafkaDataSourcesProvider {
       new FieldInfo("val", String.class, false));
   private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
   private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-
+  private static final String TBL_PROPERTIES = Joiner.on('\n').join(
+      "{\"producer\": {",
+      "\"bootstrap.servers\": \"localhost:9092\",",
+      "\"acks\": \"1\",",
+      "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
+      "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
+      "}",
+      "}"
+  );
   @SuppressWarnings("unchecked")
   @Test
   public void testKafkaSink() {
     ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-        URI.create("kafka://mock?topic=foo"), null, null, FIELDS);
+        URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
     Assert.assertNotNull(ds);
     KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
     sink.prepare(null, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index eaabc8d..0ce2d45 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -22,7 +22,6 @@ package org.apache.storm.sql.runtime;
 
 import java.net.URI;
 import java.util.List;
-import java.util.Map;
 
 public interface DataSourcesProvider {
   /**
@@ -46,5 +45,5 @@ public interface DataSourcesProvider {
 
   ISqlTridentDataSource constructTrident(
       URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields);
+      String properties, List<FieldInfo> fields);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index 0285c97..75cd391 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -61,13 +61,13 @@ public class DataSourcesRegistry {
 
   public static ISqlTridentDataSource constructTridentDataSource(
       URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields) {
+      String properties, List<FieldInfo> fields) {
     DataSourcesProvider provider = providers.get(uri.getScheme());
     if (provider == null) {
       return null;
     }
 
-    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, fields);
+    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
new file mode 100644
index 0000000..1a7238e
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.utils.Utils;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public byte[] serialize(String s, ByteBuffer b) {
+    return Utils.toByteArray(b);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b5efe2c4/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
new file mode 100644
index 0000000..07cbd26
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.Map;
+
+public class IntSerializer implements Serializer<Integer> {
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {
+  }
+
+  @Override
+  public byte[] serialize(String topic, Integer val) {
+    byte[] r = new byte[4];
+    IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
+    b.put(val);
+    return r;
+  }
+
+  @Override
+  public void close() {
+  }
+}