You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:02 UTC

[16/20] storm git commit: [StormSQL] STORM-1222. Support Kafka as external tables in StormSQL.

[StormSQL] STORM-1222. Support Kafka as external tables in StormSQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b60712f6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b60712f6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b60712f6

Branch: refs/heads/master
Commit: b60712f616be4579049d2ffd7e84f9047d20a8e3
Parents: a8894e6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Nov 19 14:09:26 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800

----------------------------------------------------------------------
 external/sql/pom.xml                            |   1 +
 external/sql/storm-sql-kafka/pom.xml            | 111 +++++++++++
 .../org/apache/storm/sql/kafka/JsonScheme.java  |  56 ++++++
 .../apache/storm/sql/kafka/JsonSerializer.java  |  58 ++++++
 .../sql/kafka/KafkaDataSourcesProvider.java     | 184 +++++++++++++++++++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 ++
 .../storm/sql/kafka/TestJsonRepresentation.java |  50 +++++
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 103 +++++++++++
 .../storm/sql/runtime/IOutputSerializer.java    |  31 ++++
 pom.xml                                         |   6 +
 10 files changed, 616 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index e4dae94..7884d39 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -39,5 +39,6 @@
     <modules>
         <module>storm-sql-core</module>
         <module>storm-sql-runtime</module>
+        <module>storm-sql-kafka</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..0f6bd19
--- /dev/null
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-kafka</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.2.1</version>
+            <!-- use provided scope, so users can pull in whichever scala version they choose -->
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.8.2.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
new file mode 100644
index 0000000..80037c6
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+  private final List<String> fields;
+
+  JsonScheme(List<String> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public List<Object> deserialize(byte[] ser) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> map = mapper.readValue(ser, HashMap.class);
+      ArrayList<Object> list = new ArrayList<>();
+      for (String f : fields) {
+        list.add(map.get(f));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fields);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
new file mode 100644
index 0000000..7c5aa57
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.lang.CharSet;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+class JsonSerializer implements IOutputSerializer {
+  private final List<String> fieldNames;
+  private transient final JsonFactory jsonFactory;
+
+  JsonSerializer(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+    jsonFactory = new JsonFactory();
+  }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+    StringWriter sw = new StringWriter();
+    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+      jg.writeStartObject();
+      for (int i = 0; i < fieldNames.size(); ++i) {
+        jg.writeFieldName(fieldNames.get(i));
+        jg.writeObject(data.get(i));
+      }
+      jg.writeEndObject();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ByteBuffer.wrap(sw.toString().getBytes(Charsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..e57e4d3
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.*;
+import storm.kafka.ZkHosts;
+import storm.kafka.trident.OpaqueTridentKafkaSpout;
+import storm.kafka.trident.TridentKafkaConfig;
+import storm.kafka.trident.TridentKafkaState;
+import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import storm.kafka.trident.selector.KafkaTopicSelector;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
+import storm.trident.spout.ITridentDataSource;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Create a Kafka spout based on the URI. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+  private static final int DEFAULT_ZK_PORT = 2181;
+  private static class StaticTopicSelector implements KafkaTopicSelector {
+    private final String topic;
+
+    private StaticTopicSelector(String topic) {
+      this.topic = topic;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+      return topic;
+    }
+  }
+
+  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+    private final int primaryKeyIndex;
+    private final IOutputSerializer serializer;
+
+    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Object getKeyFromTuple(TridentTuple tuple) {
+      return tuple.get(primaryKeyIndex);
+    }
+
+    @Override
+    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+      return serializer.write(tuple.getValues(), null);
+    }
+  }
+
+  static class KafkaTridentSink extends BaseFunction {
+    private transient TridentKafkaState state;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final List<String> fieldNames;
+
+    private KafkaTridentSink(String topic, int primaryKeyIndex, List<String> fieldNames) {
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public void cleanup() {
+      super.cleanup();
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+      JsonSerializer serializer = new JsonSerializer(fieldNames);
+      SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
+      state = new TridentKafkaState()
+          .withKafkaTopicSelector(new StaticTopicSelector(topic))
+          .withTridentTupleToKafkaMapper(m);
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      state.updateState(Collections.singletonList(tuple), collector);
+    }
+  }
+
+  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+    private final TridentKafkaConfig conf;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final List<String> fields;
+    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+                                   List<String> fields) {
+      this.conf = conf;
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.fields = fields;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      return new OpaqueTridentKafkaSpout(conf);
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new KafkaTridentSink(topic, primaryKeyIndex, fields);
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "kafka";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                List<FieldInfo> fields) {
+    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+    Map<String, String> values = parseURIParams(uri.getQuery());
+    String topic = values.get("topic");
+    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+    List<String> fieldNames = new ArrayList<>();
+    int primaryIndex = -1;
+    for (int i = 0; i < fields.size(); ++i) {
+      FieldInfo f = fields.get(i);
+      fieldNames.add(f.name());
+      if (f.isPrimary()) {
+        primaryIndex = i;
+      }
+    }
+    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+    conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, fieldNames);
+  }
+
+  private static Map<String, String> parseURIParams(String query) {
+    HashMap<String, String> res = new HashMap<>();
+    if (query == null) {
+      return res;
+    }
+
+    String[] params = query.split("&");
+    for (String p : params) {
+      String[] v = p.split("=", 2);
+      if (v.length > 1) {
+        res.put(v[0], v[1]);
+      }
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
new file mode 100644
index 0000000..d2898e8
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.utils.Utils;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonRepresentation {
+  @Test
+  public void testJsonScheme() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    final String s = "{\"ID\": 1, \"val\": \"2\"}";
+    JsonScheme scheme = new JsonScheme(fields);
+    List<Object> o = scheme.deserialize(s.getBytes(Charset.defaultCharset()));
+    assertArrayEquals(new Object[] {1, "2"}, o.toArray());
+  }
+
+  @Test
+  public void testJsonSerializer() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    List<Object> o = Lists.<Object> newArrayList(1, "2");
+    JsonSerializer s = new JsonSerializer(fields);
+    ByteBuffer buf = s.write(o, null);
+    byte[] b = Utils.toByteArray(buf);
+    assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..531f764
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+import storm.kafka.trident.TridentKafkaState;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKafkaSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+        URI.create("kafka://mock?topic=foo"), null, null, FIELDS);
+    Assert.assertNotNull(ds);
+    KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
+    sink.prepare(null, null);
+    TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
+    Producer producer = mock(Producer.class);
+    Whitebox.setInternalState(state, "producer", producer);
+    List<TridentTuple> tupleList = mockTupleList();
+    for (TridentTuple t : tupleList) {
+      state.updateState(Collections.singletonList(t), null);
+      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+    }
+    verifyNoMoreInteractions(producer);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class KafkaMessageMatcher extends ArgumentMatcher<KeyedMessage> {
+    private static final int PRIMARY_INDEX = 0;
+    private final TridentTuple tuple;
+
+    private KafkaMessageMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      KeyedMessage<Object, ByteBuffer> m = (KeyedMessage<Object,ByteBuffer>)o;
+      if (m.key() != tuple.get(PRIMARY_INDEX)) {
+        return false;
+      }
+      ByteBuffer buf = m.message();
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
new file mode 100644
index 0000000..b6670d9
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface IOutputSerializer {
+  /**
+   * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+   * memory.
+   *
+   * @return A ByteBuffer contains the serialized result.
+   */
+  ByteBuffer write(List<Object> data, ByteBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b60712f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3387583..4c7388d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,6 +228,7 @@
         <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
         <calcite.version>1.4.0-incubating</calcite.version>
+        <jackson.version>2.6.3</jackson.version>
     </properties>
 
     <modules>
@@ -694,6 +695,11 @@
               <artifactId>calcite-core</artifactId>
               <version>${calcite.version}</version>
             </dependency>
+            <dependency>
+              <groupId>com.fasterxml.jackson.core</groupId>
+              <artifactId>jackson-databind</artifactId>
+              <version>${jackson.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>