You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/06 04:58:42 UTC

[1/3] storm git commit: STORM-2081:create external directory for storm-sql various data sources and move storm-sql-kafka to it

Repository: storm
Updated Branches:
  refs/heads/1.x-branch b0b12281c -> 064c63a6b


STORM-2081:create external directory for storm-sql various data sources and move storm-sql-kafka to it


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/237ff9b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/237ff9b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/237ff9b4

Branch: refs/heads/1.x-branch
Commit: 237ff9b4215b85e27b490c3cd656c7ed716c1570
Parents: b0b1228
Author: vesense <be...@163.com>
Authored: Mon Sep 5 11:01:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:11:34 2016 +0900

----------------------------------------------------------------------
 external/sql/pom.xml                            |   2 +-
 .../storm-sql-external/storm-sql-kafka/pom.xml  |  97 +++++++++
 .../org/apache/storm/sql/kafka/JsonScheme.java  |  58 +++++
 .../apache/storm/sql/kafka/JsonSerializer.java  |  56 +++++
 .../sql/kafka/KafkaDataSourcesProvider.java     | 209 +++++++++++++++++++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 ++
 .../storm/sql/kafka/TestJsonRepresentation.java |  50 +++++
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ++++++++++
 external/sql/storm-sql-kafka/pom.xml            |  97 ---------
 .../org/apache/storm/sql/kafka/JsonScheme.java  |  58 -----
 .../apache/storm/sql/kafka/JsonSerializer.java  |  56 -----
 .../sql/kafka/KafkaDataSourcesProvider.java     | 209 -------------------
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 --
 .../storm/sql/kafka/TestJsonRepresentation.java |  50 -----
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ----------
 storm-dist/binary/src/main/assembly/binary.xml  |   4 +-
 16 files changed, 603 insertions(+), 603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 96bb012..5aa23a2 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -39,6 +39,6 @@
     <modules>
         <module>storm-sql-core</module>
         <module>storm-sql-runtime</module>
-        <module>storm-sql-kafka</module>
+        <module>storm-sql-external/storm-sql-kafka</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..021409b
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-kafka</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
new file mode 100644
index 0000000..eed1282
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+  private final List<String> fields;
+
+  JsonScheme(List<String> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+      ArrayList<Object> list = new ArrayList<>();
+      for (String f : fields) {
+        list.add(map.get(f));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
new file mode 100644
index 0000000..e3d5d01
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+class JsonSerializer implements IOutputSerializer {
+  private final List<String> fieldNames;
+  private transient final JsonFactory jsonFactory;
+
+  JsonSerializer(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+    jsonFactory = new JsonFactory();
+  }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+    StringWriter sw = new StringWriter();
+    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+      jg.writeStartObject();
+      for (int i = 0; i < fieldNames.size(); ++i) {
+        jg.writeFieldName(fieldNames.get(i));
+        jg.writeObject(data.get(i));
+      }
+      jg.writeEndObject();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..54e160f
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.*;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+  private static final int DEFAULT_ZK_PORT = 2181;
+  private static class StaticTopicSelector implements KafkaTopicSelector {
+    private final String topic;
+
+    private StaticTopicSelector(String topic) {
+      this.topic = topic;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+      return topic;
+    }
+  }
+
+  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+    private final int primaryKeyIndex;
+    private final IOutputSerializer serializer;
+
+    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Object getKeyFromTuple(TridentTuple tuple) {
+      return tuple.get(primaryKeyIndex);
+    }
+
+    @Override
+    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+      return serializer.write(tuple.getValues(), null);
+    }
+  }
+
+  static class KafkaTridentSink extends BaseFunction {
+    private transient TridentKafkaState state;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final Properties producerProperties;
+    private final List<String> fieldNames;
+
+    private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
+                             List<String> fieldNames) {
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.producerProperties = producerProperties;
+      this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public void cleanup() {
+      super.cleanup();
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+      JsonSerializer serializer = new JsonSerializer(fieldNames);
+      SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
+      state = new TridentKafkaState()
+          .withKafkaTopicSelector(new StaticTopicSelector(topic))
+          .withTridentTupleToKafkaMapper(m);
+      state.prepare(producerProperties);
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      state.updateState(Collections.singletonList(tuple), collector);
+    }
+  }
+
+  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+    private final TridentKafkaConfig conf;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final List<String> fields;
+    private final String producerProperties;
+    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+                                   String producerProperties, List<String> fields) {
+      this.conf = conf;
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.producerProperties = producerProperties;
+      this.fields = fields;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      return new OpaqueTridentKafkaSpout(conf);
+    }
+
+    @Override
+    public Function getConsumer() {
+      Preconditions.checkNotNull(producerProperties,
+          "Writable Kafka Table " + topic + " must contain producer config");
+      Properties props = new Properties();
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        @SuppressWarnings("unchecked")
+        HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
+        @SuppressWarnings("unchecked")
+        HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
+        props.putAll(producerConfig);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Preconditions.checkState(props.containsKey("bootstrap.servers"),
+          "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+      return new KafkaTridentSink(topic, primaryKeyIndex, props, fields);
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "kafka";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                String properties, List<FieldInfo> fields) {
+    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+    Map<String, String> values = parseURIParams(uri.getQuery());
+    String topic = values.get("topic");
+    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+    List<String> fieldNames = new ArrayList<>();
+    int primaryIndex = -1;
+    for (int i = 0; i < fields.size(); ++i) {
+      FieldInfo f = fields.get(i);
+      fieldNames.add(f.name());
+      if (f.isPrimary()) {
+        primaryIndex = i;
+      }
+    }
+    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+    conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
+
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, fieldNames);
+  }
+
+  private static Map<String, String> parseURIParams(String query) {
+    HashMap<String, String> res = new HashMap<>();
+    if (query == null) {
+      return res;
+    }
+
+    String[] params = query.split("&");
+    for (String p : params) {
+      String[] v = p.split("=", 2);
+      if (v.length > 1) {
+        res.put(v[0], v[1]);
+      }
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
new file mode 100644
index 0000000..7e85410
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import org.apache.storm.utils.Utils;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonRepresentation {
+  @Test
+  public void testJsonScheme() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    final String s = "{\"ID\": 1, \"val\": \"2\"}";
+    JsonScheme scheme = new JsonScheme(fields);
+    List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
+    assertArrayEquals(new Object[] {1, "2"}, o.toArray());
+  }
+
+  @Test
+  public void testJsonSerializer() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    List<Object> o = Lists.<Object> newArrayList(1, "2");
+    JsonSerializer s = new JsonSerializer(fields);
+    ByteBuffer buf = s.write(o, null);
+    byte[] b = Utils.toByteArray(buf);
+    assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..f6e75ab
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+  private static final String TBL_PROPERTIES = Joiner.on('\n').join(
+      "{\"producer\": {",
+      "\"bootstrap.servers\": \"localhost:9092\",",
+      "\"acks\": \"1\",",
+      "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
+      "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
+      "}",
+      "}"
+  );
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKafkaSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+        URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+    KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
+    sink.prepare(null, null);
+    TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
+    KafkaProducer producer = mock(KafkaProducer.class);
+    doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
+    Whitebox.setInternalState(state, "producer", producer);
+    List<TridentTuple> tupleList = mockTupleList();
+    for (TridentTuple t : tupleList) {
+      state.updateState(Collections.singletonList(t), null);
+      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+    }
+    verifyNoMoreInteractions(producer);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
+    private static final int PRIMARY_INDEX = 0;
+    private final TridentTuple tuple;
+
+    private KafkaMessageMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
+      if (m.key() != tuple.get(PRIMARY_INDEX)) {
+        return false;
+      }
+      ByteBuffer buf = m.value();
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
deleted file mode 100644
index d2796e1..0000000
--- a/external/sql/storm-sql-kafka/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>1.1.0-SNAPSHOT</version>
-        <relativePath>../../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-sql-kafka</artifactId>
-
-    <developers>
-        <developer>
-            <id>haohui</id>
-            <name>Haohui Mai</name>
-            <email>ricetons@gmail.com</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${storm.kafka.artifact.id}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${storm.kafka.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <sourceDirectory>src/jvm</sourceDirectory>
-        <testSourceDirectory>src/test</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>${basedir}/src/resources</directory>
-            </resource>
-        </resources>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
deleted file mode 100644
index eed1282..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class JsonScheme implements Scheme {
-  private final List<String> fields;
-
-  JsonScheme(List<String> fields) {
-    this.fields = fields;
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
-      ArrayList<Object> list = new ArrayList<>();
-      for (String f : fields) {
-        list.add(map.get(f));
-      }
-      return list;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(fields);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
deleted file mode 100644
index e3d5d01..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-class JsonSerializer implements IOutputSerializer {
-  private final List<String> fieldNames;
-  private transient final JsonFactory jsonFactory;
-
-  JsonSerializer(List<String> fieldNames) {
-    this.fieldNames = fieldNames;
-    jsonFactory = new JsonFactory();
-  }
-
-  @Override
-  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
-    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
-    StringWriter sw = new StringWriter();
-    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
-      jg.writeStartObject();
-      for (int i = 0; i < fieldNames.size(); ++i) {
-        jg.writeFieldName(fieldNames.get(i));
-        jg.writeObject(data.get(i));
-      }
-      jg.writeEndObject();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return ByteBuffer.wrap(sw.toString().getBytes(StandardCharsets.UTF_8));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
deleted file mode 100644
index 54e160f..0000000
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.spout.SchemeAsMultiScheme;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import org.apache.storm.sql.runtime.*;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.kafka.trident.TridentKafkaConfig;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-/**
- * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
- * of the Kafka broker.
- */
-public class KafkaDataSourcesProvider implements DataSourcesProvider {
-  private static final int DEFAULT_ZK_PORT = 2181;
-  private static class StaticTopicSelector implements KafkaTopicSelector {
-    private final String topic;
-
-    private StaticTopicSelector(String topic) {
-      this.topic = topic;
-    }
-
-    @Override
-    public String getTopic(TridentTuple tuple) {
-      return topic;
-    }
-  }
-
-  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
-    private final int primaryKeyIndex;
-    private final IOutputSerializer serializer;
-
-    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
-      this.primaryKeyIndex = primaryKeyIndex;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public Object getKeyFromTuple(TridentTuple tuple) {
-      return tuple.get(primaryKeyIndex);
-    }
-
-    @Override
-    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
-      return serializer.write(tuple.getValues(), null);
-    }
-  }
-
-  static class KafkaTridentSink extends BaseFunction {
-    private transient TridentKafkaState state;
-    private final String topic;
-    private final int primaryKeyIndex;
-    private final Properties producerProperties;
-    private final List<String> fieldNames;
-
-    private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties,
-                             List<String> fieldNames) {
-      this.topic = topic;
-      this.primaryKeyIndex = primaryKeyIndex;
-      this.producerProperties = producerProperties;
-      this.fieldNames = fieldNames;
-    }
-
-    @Override
-    public void cleanup() {
-      super.cleanup();
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-      JsonSerializer serializer = new JsonSerializer(fieldNames);
-      SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
-      state = new TridentKafkaState()
-          .withKafkaTopicSelector(new StaticTopicSelector(topic))
-          .withTridentTupleToKafkaMapper(m);
-      state.prepare(producerProperties);
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-      state.updateState(Collections.singletonList(tuple), collector);
-    }
-  }
-
-  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
-    private final TridentKafkaConfig conf;
-    private final String topic;
-    private final int primaryKeyIndex;
-    private final List<String> fields;
-    private final String producerProperties;
-    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
-                                   String producerProperties, List<String> fields) {
-      this.conf = conf;
-      this.topic = topic;
-      this.primaryKeyIndex = primaryKeyIndex;
-      this.producerProperties = producerProperties;
-      this.fields = fields;
-    }
-
-    @Override
-    public ITridentDataSource getProducer() {
-      return new OpaqueTridentKafkaSpout(conf);
-    }
-
-    @Override
-    public Function getConsumer() {
-      Preconditions.checkNotNull(producerProperties,
-          "Writable Kafka Table " + topic + " must contain producer config");
-      Properties props = new Properties();
-      try {
-        ObjectMapper mapper = new ObjectMapper();
-        @SuppressWarnings("unchecked")
-        HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
-        @SuppressWarnings("unchecked")
-        HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
-        props.putAll(producerConfig);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      Preconditions.checkState(props.containsKey("bootstrap.servers"),
-          "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
-      return new KafkaTridentSink(topic, primaryKeyIndex, props, fields);
-    }
-  }
-
-  @Override
-  public String scheme() {
-    return "kafka";
-  }
-
-  @Override
-  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
-                              List<FieldInfo> fields) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                String properties, List<FieldInfo> fields) {
-    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
-    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
-    Map<String, String> values = parseURIParams(uri.getQuery());
-    String topic = values.get("topic");
-    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
-    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
-    List<String> fieldNames = new ArrayList<>();
-    int primaryIndex = -1;
-    for (int i = 0; i < fields.size(); ++i) {
-      FieldInfo f = fields.get(i);
-      fieldNames.add(f.name());
-      if (f.isPrimary()) {
-        primaryIndex = i;
-      }
-    }
-    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
-    conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
-
-    return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, fieldNames);
-  }
-
-  private static Map<String, String> parseURIParams(String query) {
-    HashMap<String, String> res = new HashMap<>();
-    if (query == null) {
-      return res;
-    }
-
-    String[] params = query.split("&");
-    for (String p : params) {
-      String[] v = p.split("=", 2);
-      if (v.length > 1) {
-        res.put(v[0], v[1]);
-      }
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 7f687cc..0000000
--- a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
deleted file mode 100644
index 7e85410..0000000
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import org.apache.storm.utils.Utils;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestJsonRepresentation {
-  @Test
-  public void testJsonScheme() {
-    final List<String> fields = Lists.newArrayList("ID", "val");
-    final String s = "{\"ID\": 1, \"val\": \"2\"}";
-    JsonScheme scheme = new JsonScheme(fields);
-    List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
-    assertArrayEquals(new Object[] {1, "2"}, o.toArray());
-  }
-
-  @Test
-  public void testJsonSerializer() {
-    final List<String> fields = Lists.newArrayList("ID", "val");
-    List<Object> o = Lists.<Object> newArrayList(1, "2");
-    JsonSerializer s = new JsonSerializer(fields);
-    ByteBuffer buf = s.write(o, null);
-    byte[] b = Utils.toByteArray(buf);
-    assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
deleted file mode 100644
index f6e75ab..0000000
--- a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import static org.mockito.Mockito.*;
-
-public class TestKafkaDataSourcesProvider {
-  private static final List<FieldInfo> FIELDS = ImmutableList.of(
-      new FieldInfo("ID", int.class, true),
-      new FieldInfo("val", String.class, false));
-  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
-  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final String TBL_PROPERTIES = Joiner.on('\n').join(
-      "{\"producer\": {",
-      "\"bootstrap.servers\": \"localhost:9092\",",
-      "\"acks\": \"1\",",
-      "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
-      "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
-      "}",
-      "}"
-  );
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testKafkaSink() {
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-        URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
-    Assert.assertNotNull(ds);
-    KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
-    sink.prepare(null, null);
-    TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
-    KafkaProducer producer = mock(KafkaProducer.class);
-    doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
-    Whitebox.setInternalState(state, "producer", producer);
-    List<TridentTuple> tupleList = mockTupleList();
-    for (TridentTuple t : tupleList) {
-      state.updateState(Collections.singletonList(t), null);
-      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
-    }
-    verifyNoMoreInteractions(producer);
-  }
-
-  private static List<TridentTuple> mockTupleList() {
-    List<TridentTuple> tupleList = new ArrayList<>();
-    TridentTuple t0 = mock(TridentTuple.class);
-    TridentTuple t1 = mock(TridentTuple.class);
-    doReturn(1).when(t0).get(0);
-    doReturn(2).when(t1).get(0);
-    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
-    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
-    tupleList.add(t0);
-    tupleList.add(t1);
-    return tupleList;
-  }
-
-  private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
-    private static final int PRIMARY_INDEX = 0;
-    private final TridentTuple tuple;
-
-    private KafkaMessageMatcher(TridentTuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean matches(Object o) {
-      ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
-      if (m.key() != tuple.get(PRIMARY_INDEX)) {
-        return false;
-      }
-      ByteBuffer buf = m.value();
-      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
-      return b.equals(buf);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/237ff9b4/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index a16766b..5e1f4f1 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -298,8 +298,8 @@
             </includes>
         </fileSet>
         <fileSet>
-            <directory>${project.basedir}/../../external/sql/storm-sql-kafka/target</directory>
-            <outputDirectory>external/sql/storm-sql-kafka</outputDirectory>
+            <directory>${project.basedir}/../../external/sql/storm-sql-external/storm-sql-kafka/target</directory>
+            <outputDirectory>external/sql/storm-sql-external/storm-sql-kafka</outputDirectory>
             <includes>
                 <include>storm*jar</include>
             </includes>


[2/3] storm git commit: Merge branch 'STORM-2081-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2081-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a929504
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a929504
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a929504

Branch: refs/heads/1.x-branch
Commit: 3a9295046d232ab734d293f2301487cb97cc058c
Parents: b0b1228 237ff9b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 6 13:12:27 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:12:27 2016 +0900

----------------------------------------------------------------------
 external/sql/pom.xml                            |   2 +-
 .../storm-sql-external/storm-sql-kafka/pom.xml  |  97 +++++++++
 .../org/apache/storm/sql/kafka/JsonScheme.java  |  58 +++++
 .../apache/storm/sql/kafka/JsonSerializer.java  |  56 +++++
 .../sql/kafka/KafkaDataSourcesProvider.java     | 209 +++++++++++++++++++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 ++
 .../storm/sql/kafka/TestJsonRepresentation.java |  50 +++++
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ++++++++++
 external/sql/storm-sql-kafka/pom.xml            |  97 ---------
 .../org/apache/storm/sql/kafka/JsonScheme.java  |  58 -----
 .../apache/storm/sql/kafka/JsonSerializer.java  |  56 -----
 .../sql/kafka/KafkaDataSourcesProvider.java     | 209 -------------------
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 --
 .../storm/sql/kafka/TestJsonRepresentation.java |  50 -----
 .../sql/kafka/TestKafkaDataSourcesProvider.java | 114 ----------
 storm-dist/binary/src/main/assembly/binary.xml  |   4 +-
 16 files changed, 603 insertions(+), 603 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-2081 to CHANGELOG

Posted by ka...@apache.org.
add STORM-2081 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/064c63a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/064c63a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/064c63a6

Branch: refs/heads/1.x-branch
Commit: 064c63a6be69b7817e299570249996d6111324b0
Parents: 3a92950
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 6 13:12:47 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 6 13:12:47 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/064c63a6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f7491dc..5c39009 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -79,6 +79,7 @@
  * STORM-1720: Support GEO in storm-redis
 
 ## 1.0.3
+ * STORM-2081: create external directory for storm-sql various data sources and move storm-sql-kafka to it
  * STORM-2054: DependencyResolver should be aware of relative path and absolute path
  * STORM-1344: Remove sql command from storm-jdbc build
  * STORM-2070: Fix sigar native binary download link