You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:06 UTC

[03/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
new file mode 100644
index 0000000..38c3fcb
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.hdfs.trident.HdfsState;
+import org.apache.storm.hdfs.trident.HdfsStateFactory;
+import org.apache.storm.hdfs.trident.HdfsUpdater;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file
+ * The properties are in JSON format which specifies the name / path of the hdfs file and etc.
+ */
+public class HdfsDataSourcesProvider implements DataSourcesProvider {
+
+  private static class HdfsTridentDataSource implements ISqlTridentDataSource {
+    private final String url;
+    private final Properties props;
+    private final IOutputSerializer serializer;
+
+    private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
+      this.url = url;
+      this.props = props;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      FileNameFormat fileNameFormat = new SimpleFileNameFormat()
+          .withPath(props.getProperty("hdfs.file.path", "/storm"))
+          .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt"));
+
+      RecordFormat recordFormat = new TridentRecordFormat(serializer);
+
+      FileRotationPolicy rotationPolicy;
+      String size = props.getProperty("hdfs.rotation.size.kb");
+      String interval = props.getProperty("hdfs.rotation.time.seconds");
+      Preconditions.checkArgument(size != null || interval != null, "Hdfs data source must contain file rotation config");
+
+      if (size != null) {
+        rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB);
+      } else {
+        rotationPolicy = new TimedRotationPolicy(Float.parseFloat(interval), TimedRotationPolicy.TimeUnit.SECONDS);
+      }
+
+      HdfsState.Options options = new HdfsState.HdfsFileOptions()
+          .withFileNameFormat(fileNameFormat)
+          .withRecordFormat(recordFormat)
+          .withRotationPolicy(rotationPolicy)
+          .withFsUrl(url);
+
+      StateFactory stateFactory = new HdfsStateFactory().withOptions(options);
+      StateUpdater stateUpdater = new HdfsUpdater();
+
+      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+    }
+  }
+
+  private static class TridentRecordFormat implements RecordFormat {
+    private final IOutputSerializer serializer;
+
+    private TridentRecordFormat(IOutputSerializer serializer) {
+      this.serializer = serializer;
+    }
+
+    @Override
+    public byte[] format(TridentTuple tuple) {
+      //TODO we should handle '\n'. ref DelimitedRecordFormat
+      return serializer.write(tuple.getValues(), null).array();
+    }
+
+  }
+
+  @Override
+  public String scheme() {
+    return "hdfs";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                Properties properties, List<FieldInfo> fields) {
+    List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+    return new HdfsTridentDataSource(uri.toString(), properties, serializer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..5fac84f
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.hdfs.HdfsDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
new file mode 100644
index 0000000..1473438
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.hdfs;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.storm.hdfs.trident.HdfsState;
+import org.apache.storm.hdfs.trident.HdfsStateFactory;
+import org.apache.storm.hdfs.trident.HdfsUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class TestHdfsDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final Properties TBL_PROPERTIES = new Properties();
+
+  private static String hdfsURI;
+  private static MiniDFSCluster hdfsCluster;
+
+  static {
+    TBL_PROPERTIES.put("hdfs.file.path", "/unittest");
+    TBL_PROPERTIES.put("hdfs.file.name", "test1.txt");
+    TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120");
+  }
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.trash.interval", "10");
+    conf.setBoolean("dfs.permissions", true);
+    File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+    FileUtil.fullyDelete(baseDir);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    hdfsCluster = builder.build();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+  }
+
+  @After
+  public void shutDown() throws IOException {
+    hdfsCluster.shutdown();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testHdfsSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+            URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass());
+
+    HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+    StateUpdater stateUpdater = consumer.getStateUpdater();
+
+    HdfsFileOptions options = mock(HdfsFileOptions.class);
+    Whitebox.setInternalState(state, "options", options);
+
+    List<TridentTuple> tupleList = mockTupleList();
+
+    for (TridentTuple t : tupleList) {
+      stateUpdater.updateState(state, Collections.singletonList(t), null);
+      try {
+        verify(options).execute(Collections.singletonList(t));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/pom.xml b/sql/storm-sql-external/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..881343d
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-kafka</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..2aa98ba
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
+ * of the Kafka broker.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+  private static final int DEFAULT_ZK_PORT = 2181;
+
+  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+    private final int primaryKeyIndex;
+    private final IOutputSerializer serializer;
+
+    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Object getKeyFromTuple(TridentTuple tuple) {
+      return tuple.get(primaryKeyIndex);
+    }
+
+    @Override
+    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+      return serializer.write(tuple.getValues(), null);
+    }
+  }
+
+  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+    private final TridentKafkaConfig conf;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final Properties props;
+    private final IOutputSerializer serializer;
+    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+                                   Properties props, IOutputSerializer serializer) {
+      this.conf = conf;
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.props = props;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      return new OpaqueTridentKafkaSpout(conf);
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      Preconditions.checkArgument(!props.isEmpty(),
+              "Writable Kafka Table " + topic + " must contain producer config");
+      HashMap<String, Object> producerConfig = (HashMap<String, Object>) props.get("producer");
+      props.putAll(producerConfig);
+      Preconditions.checkState(props.containsKey("bootstrap.servers"),
+              "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+
+      SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer);
+
+      TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+              .withKafkaTopicSelector(new DefaultTopicSelector(topic))
+              .withProducerProperties(props)
+              .withTridentTupleToKafkaMapper(mapper);
+
+      TridentKafkaUpdater stateUpdater = new TridentKafkaUpdater();
+
+      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "kafka";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                Properties properties, List<FieldInfo> fields) {
+    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+    Map<String, String> values = parseURIParams(uri.getQuery());
+    String topic = values.get("topic");
+    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+    List<String> fieldNames = new ArrayList<>();
+    int primaryIndex = -1;
+    for (int i = 0; i < fields.size(); ++i) {
+      FieldInfo f = fields.get(i);
+      fieldNames.add(f.name());
+      if (f.isPrimary()) {
+        primaryIndex = i;
+      }
+    }
+    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+    Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
+    conf.scheme = new SchemeAsMultiScheme(scheme);
+    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, serializer);
+  }
+
+  private static Map<String, String> parseURIParams(String query) {
+    HashMap<String, String> res = new HashMap<>();
+    if (query == null) {
+      return res;
+    }
+
+    String[] params = query.split("&");
+    for (String p : params) {
+      String[] v = p.split("=", 2);
+      if (v.length > 1) {
+        res.put(v[0], v[1]);
+      }
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..0cde492
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class TestKafkaDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+          new FieldInfo("ID", int.class, true),
+          new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+  private static final Properties TBL_PROPERTIES = new Properties();
+
+  static {
+    Map<String,Object> map = new HashMap<>();
+    map.put("bootstrap.servers", "localhost:9092");
+    map.put("acks", "1");
+    map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    TBL_PROPERTIES.put("producer", map);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKafkaSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+            URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass());
+
+    TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+    KafkaProducer producer = mock(KafkaProducer.class);
+    doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
+    Whitebox.setInternalState(state, "producer", producer);
+
+    List<TridentTuple> tupleList = mockTupleList();
+    for (TridentTuple t : tupleList) {
+      state.updateState(Collections.singletonList(t), null);
+      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+    }
+    verifyNoMoreInteractions(producer);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
+    private static final int PRIMARY_INDEX = 0;
+    private final TridentTuple tuple;
+
+    private KafkaMessageMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
+      if (m.key() != tuple.get(PRIMARY_INDEX)) {
+        return false;
+      }
+      ByteBuffer buf = m.value();
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
new file mode 100644
index 0000000..218d89b
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-mongodb</artifactId>
+
+    <developers>
+        <developer>
+            <id>vesense</id>
+            <name>Xin Wang</name>
+            <email>data.xinwang@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-mongodb</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
new file mode 100644
index 0000000..60d52d1
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a MongoDB sink based on the URI and properties. The URI has the format of
+ * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
+ * The properties are in JSON format which specifies the name of the MongoDB collection and etc.
+ */
+public class MongoDataSourcesProvider implements DataSourcesProvider {
+
+  private static class MongoTridentDataSource implements ISqlTridentDataSource {
+    private final String url;
+    private final Properties props;
+    private final IOutputSerializer serializer;
+
+    private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
+      this.url = url;
+      this.props = props;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
+      String serField = props.getProperty("trident.ser.field", "tridentSerField");
+      MongoMapper mapper = new TridentMongoMapper(serField, serializer);
+
+      MongoState.Options options = new MongoState.Options()
+          .withUrl(url)
+          .withCollectionName(props.getProperty("collection.name"))
+          .withMapper(mapper);
+
+      StateFactory stateFactory = new MongoStateFactory(options);
+      StateUpdater stateUpdater = new MongoStateUpdater();
+
+      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+    }
+  }
+
+  private static class TridentMongoMapper implements MongoMapper {
+    private final String serField;
+    private final IOutputSerializer serializer;
+
+    private TridentMongoMapper(String serField, IOutputSerializer serializer) {
+      this.serField = serField;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+      Document document = new Document();
+      byte[] array = serializer.write(tuple.getValues(), null).array();
+      document.append(serField, array);
+      return document;
+    }
+
+    @Override
+    public Document toDocumentByKeys(List<Object> keys) {
+      return null;
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "mongodb";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                Properties properties, List<FieldInfo> fields) {
+    List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+    return new MongoTridentDataSource(uri.toString(), properties, serializer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..e46d794
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.mongodb.MongoDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
new file mode 100644
index 0000000..3b15345
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class TestMongoDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+  private static final Properties TBL_PROPERTIES = new Properties();
+
+  static {
+    TBL_PROPERTIES.put("collection.name", "collection1");
+    TBL_PROPERTIES.put("trident.ser.field", "tridentSerField");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMongoSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+            URI.create("mongodb://127.0.0.1:27017/test"), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(MongoStateFactory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(MongoStateUpdater.class, consumer.getStateUpdater().getClass());
+
+    MongoState state = (MongoState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+    StateUpdater stateUpdater = consumer.getStateUpdater();
+
+    MongoDBClient mongoClient = mock(MongoDBClient.class);
+    Whitebox.setInternalState(state, "mongoClient", mongoClient);
+
+    List<TridentTuple> tupleList = mockTupleList();
+
+    for (TridentTuple t : tupleList) {
+      stateUpdater.updateState(state, Collections.singletonList(t), null);
+      verify(mongoClient).insert(argThat(new MongoArgMatcher(t)) , eq(true));
+    }
+
+    verifyNoMoreInteractions(mongoClient);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class MongoArgMatcher extends ArgumentMatcher<List<Document>> {
+    private final TridentTuple tuple;
+
+    private MongoArgMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      Document doc = ((List<Document>)o).get(0);
+      ByteBuffer buf = ByteBuffer.wrap((byte[])doc.get(TBL_PROPERTIES.getProperty("trident.ser.field")));
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/pom.xml b/sql/storm-sql-external/storm-sql-redis/pom.xml
new file mode 100644
index 0000000..32ce432
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-redis</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-redis</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
new file mode 100644
index 0000000..68933b2
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.redis;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesProvider;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.tuple.ITuple;
+import redis.clients.util.JedisURIHelper;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a Redis sink based on the URI and properties. The URI has the format of
+ * redis://:[password]@[host]:[port]/[dbIdx]. Only host is mandatory and others can be set to default.
+ *
+ * The properties are in JSON format which specifies the config of the Redis data type and etc.
+ * Please note that when "use.redis.cluster" is "true", cluster discovery is only done from given URI.
+ */
+public class RedisDataSourcesProvider implements DataSourcesProvider {
+  private static final int DEFAULT_REDIS_PORT = 6379;
+  private static final int DEFAULT_TIMEOUT = 2000;
+
+  private abstract static class AbstractRedisTridentDataSource implements ISqlTridentDataSource, Serializable {
+    protected abstract StateFactory newStateFactory();
+    protected abstract StateUpdater newStateUpdater(RedisStoreMapper storeMapper);
+
+    private final Properties props;
+    private final List<FieldInfo> fields;
+    private final IOutputSerializer serializer;
+
+    AbstractRedisTridentDataSource(Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+      this.props = props;
+      this.fields = fields;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      RedisDataTypeDescription dataTypeDescription = getDataTypeDesc(props);
+
+      RedisStoreMapper storeMapper = new TridentRedisStoreMapper(dataTypeDescription, fields, serializer);
+
+      StateFactory stateFactory = newStateFactory();
+      StateUpdater stateUpdater = newStateUpdater(storeMapper);
+
+      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+    }
+
+    private RedisDataTypeDescription getDataTypeDesc(Properties props) {
+      Preconditions.checkArgument(props.containsKey("data.type"),
+              "Redis data source must contain \"data.type\" config");
+
+      RedisDataTypeDescription.RedisDataType dataType = RedisDataTypeDescription.RedisDataType.valueOf(props.getProperty("data.type").toUpperCase());
+      String additionalKey = props.getProperty("data.additional.key");
+
+      return new RedisDataTypeDescription(dataType, additionalKey);
+    }
+  }
+
+  private static class RedisClusterTridentDataSource extends AbstractRedisTridentDataSource {
+    private final JedisClusterConfig config;
+
+    RedisClusterTridentDataSource(JedisClusterConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+      super(props, fields, serializer);
+      this.config = config;
+    }
+
+    @Override
+    protected StateFactory newStateFactory() {
+      return new RedisClusterState.Factory(config);
+    }
+
+    @Override
+    protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
+      return new RedisClusterStateUpdater(storeMapper);
+    }
+  }
+
+  private static class RedisTridentDataSource extends AbstractRedisTridentDataSource {
+    private final JedisPoolConfig config;
+
+    RedisTridentDataSource(JedisPoolConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
+      super(props, fields, serializer);
+      this.config = config;
+    }
+
+    @Override
+    protected StateFactory newStateFactory() {
+      return new RedisState.Factory(config);
+    }
+
+    @Override
+    protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
+      return new RedisStateUpdater(storeMapper);
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "redis";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) {
+    Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri);
+
+    String host = uri.getHost();
+    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_REDIS_PORT;
+    int dbIdx = JedisURIHelper.getDBIndex(uri);
+    String password = JedisURIHelper.getPassword(uri);
+
+    int timeout = Integer.parseInt(props.getProperty("redis.timeout", String.valueOf(DEFAULT_TIMEOUT)));
+
+    boolean clusterMode = Boolean.valueOf(props.getProperty("use.redis.cluster", "false"));
+
+    List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, props, fieldNames);
+    if (clusterMode) {
+      JedisClusterConfig config = new JedisClusterConfig.Builder()
+              .setNodes(Collections.singleton(new InetSocketAddress(host, port)))
+              .setTimeout(timeout)
+              .build();
+      return new RedisClusterTridentDataSource(config, props, fields, serializer);
+    } else {
+      JedisPoolConfig config = new JedisPoolConfig(host, port, timeout, password, dbIdx);
+      return new RedisTridentDataSource(config, props, fields, serializer);
+    }
+  }
+
+  private static class TridentRedisStoreMapper implements RedisStoreMapper {
+    private final RedisDataTypeDescription dataTypeDescription;
+    private final FieldInfo primaryKeyField;
+    private final IOutputSerializer outputSerializer;
+
+    private TridentRedisStoreMapper(RedisDataTypeDescription dataTypeDescription, List<FieldInfo> fields, IOutputSerializer outputSerializer) {
+      this.dataTypeDescription = dataTypeDescription;
+      this.outputSerializer = outputSerializer;
+
+      // find primary key from constructor
+      FieldInfo pkField = findPrimaryKeyField(fields);
+      Preconditions.checkArgument(pkField != null, "Primary key must be presented to field list");
+
+      this.primaryKeyField = pkField;
+    }
+
+    private FieldInfo findPrimaryKeyField(List<FieldInfo> fields) {
+      FieldInfo pkField = null;
+      for (FieldInfo field : fields) {
+        if (field.isPrimary()) {
+          // TODO: this assumes key is only from the one field
+          // if not we need to have order of fields in PK
+          pkField = field;
+          break;
+        }
+      }
+      return pkField;
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+      return dataTypeDescription;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+      String keyFieldName = primaryKeyField.name();
+      Object key = tuple.getValueByField(keyFieldName);
+      if (key == null) {
+        throw new NullPointerException("key field " + keyFieldName + " is null");
+      }
+      return String.valueOf(key);
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+      byte[] array = outputSerializer.write(tuple.getValues(), null).array();
+      return new String(array);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..23b0444
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.redis.RedisDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
new file mode 100644
index 0000000..94d4949
--- /dev/null
+++ b/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.redis;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestRedisDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final String ADDITIONAL_KEY = "hello";
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+  private static final Properties TBL_PROPERTIES = new Properties();
+  private static final Properties CLUSTER_TBL_PROPERTIES = new Properties();
+
+  static {
+    TBL_PROPERTIES.put("data.type", "HASH");
+    TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+    CLUSTER_TBL_PROPERTIES.put("data.type", "HASH");
+    CLUSTER_TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+    CLUSTER_TBL_PROPERTIES.put("use.redis.cluster", "true");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRedisSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+        URI.create("redis://:foobared@localhost:6380/2"), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(RedisState.Factory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(RedisStateUpdater.class, consumer.getStateUpdater().getClass());
+
+    RedisState state = (RedisState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+    StateUpdater stateUpdater = consumer.getStateUpdater();
+
+    JedisPool mockJedisPool = mock(JedisPool.class);
+    Jedis mockJedis = mock(Jedis.class);
+    Pipeline mockPipeline = mock(Pipeline.class);
+
+    Whitebox.setInternalState(state, "jedisPool", mockJedisPool);
+    when(mockJedisPool.getResource()).thenReturn(mockJedis);
+    when(mockJedis.pipelined()).thenReturn(mockPipeline);
+
+    List<TridentTuple> tupleList = mockTupleList();
+
+    stateUpdater.updateState(state, tupleList, null);
+    for (TridentTuple t : tupleList) {
+      // PK goes to the key
+      String id = String.valueOf(t.getValueByField("ID"));
+      String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
+      verify(mockPipeline).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
+    }
+
+    verify(mockPipeline).sync();
+    verify(mockJedis).close();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRedisClusterSink() throws IOException {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+        URI.create("redis://localhost:6380"), null, null, CLUSTER_TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(RedisClusterState.Factory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(RedisClusterStateUpdater.class, consumer.getStateUpdater().getClass());
+
+    RedisClusterState state = (RedisClusterState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
+    StateUpdater stateUpdater = consumer.getStateUpdater();
+
+    JedisCluster mockJedisCluster = mock(JedisCluster.class);
+
+    Whitebox.setInternalState(state, "jedisCluster", mockJedisCluster);
+
+    List<TridentTuple> tupleList = mockTupleList();
+
+    stateUpdater.updateState(state, tupleList, null);
+    for (TridentTuple t : tupleList) {
+      // PK goes to the key
+      String id = String.valueOf(t.getValueByField("ID"));
+      String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
+      verify(mockJedisCluster).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
+    }
+
+    verify(mockJedisCluster, never()).close();
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    when(t0.getValueByField("ID")).thenReturn(1);
+    when(t0.getValueByField("val")).thenReturn("2");
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+
+    when(t1.getValueByField("ID")).thenReturn(2);
+    when(t1.getValueByField("val")).thenReturn("3");
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/pom.xml b/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..ce57cb2
--- /dev/null
+++ b/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-runtime</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-dbcp</groupId>
+                    <artifactId>commons-dbcp</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>appassembler-maven-plugin</artifactId>
+                <version>1.9</version>
+                <executions>
+                    <execution>
+                        <id>create-repo</id>
+                        <goals>
+                            <goal>create-repository</goal>
+                        </goals>
+                        <configuration>
+                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+                            <repositoryLayout>flat</repositoryLayout>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
new file mode 100644
index 0000000..aa7e435
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+
+import java.io.Serializable;
+
+/**
+ * This is a hack to use Calcite Context.
+ */
+public class StormContext extends Context implements Serializable {
+    public StormContext(DataContext root) {
+        super(root);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
new file mode 100644
index 0000000..64be39d
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+  @Override
+  public abstract void dataReceived(ChannelContext ctx, Values data);
+
+  @Override
+  public void channelInactive(ChannelContext ctx) {
+
+  }
+
+  @Override
+  public void exceptionCaught(Throwable cause) {
+
+  }
+
+  @Override
+  public void flush(ChannelContext ctx) {
+    ctx.flush();
+  }
+
+  @Override
+  public void setSource(ChannelContext ctx, Object source) {
+
+  }
+
+  public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      ctx.emit(data);
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..6a853be
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+  /**
+   * Initialize the data sources.
+   *
+   * @param data a map from the table name to the iterators of the values.
+   *
+   */
+  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+      result);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
new file mode 100644
index 0000000..65ad01c
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+public interface ChannelContext {
+  /**
+   * Emit data to the next stage of the data pipeline.
+   */
+  void emit(Values data);
+  void fireChannelInactive();
+  void flush();
+  void setSource(Object source);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
new file mode 100644
index 0000000..af02b7e
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+  void dataReceived(ChannelContext ctx, Values data);
+
+  /**
+   * The producer of the data has indicated that the channel is no longer
+   * active.
+   * @param ctx
+   */
+  void channelInactive(ChannelContext ctx);
+
+  void exceptionCaught(Throwable cause);
+
+  void flush(ChannelContext ctx);
+
+  void setSource(ChannelContext ctx, Object source);
+}