You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:16 UTC

[13/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
deleted file mode 100644
index 2aa98ba..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
-import org.apache.storm.kafka.trident.TridentKafkaConfig;
-import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
-import org.apache.storm.kafka.trident.TridentKafkaUpdater;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Create a Kafka spout/sink based on the URI and properties. The URI has the format of
- * kafka://zkhost:port/broker_path?topic=topic. The properties are in JSON format which specifies the producer config
- * of the Kafka broker.
- */
-public class KafkaDataSourcesProvider implements DataSourcesProvider {
-  private static final int DEFAULT_ZK_PORT = 2181;
-
-  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
-    private final int primaryKeyIndex;
-    private final IOutputSerializer serializer;
-
-    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
-      this.primaryKeyIndex = primaryKeyIndex;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public Object getKeyFromTuple(TridentTuple tuple) {
-      return tuple.get(primaryKeyIndex);
-    }
-
-    @Override
-    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
-      return serializer.write(tuple.getValues(), null);
-    }
-  }
-
-  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
-    private final TridentKafkaConfig conf;
-    private final String topic;
-    private final int primaryKeyIndex;
-    private final Properties props;
-    private final IOutputSerializer serializer;
-    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
-                                   Properties props, IOutputSerializer serializer) {
-      this.conf = conf;
-      this.topic = topic;
-      this.primaryKeyIndex = primaryKeyIndex;
-      this.props = props;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public ITridentDataSource getProducer() {
-      return new OpaqueTridentKafkaSpout(conf);
-    }
-
-    @Override
-    public SqlTridentConsumer getConsumer() {
-      Preconditions.checkArgument(!props.isEmpty(),
-              "Writable Kafka Table " + topic + " must contain producer config");
-      HashMap<String, Object> producerConfig = (HashMap<String, Object>) props.get("producer");
-      props.putAll(producerConfig);
-      Preconditions.checkState(props.containsKey("bootstrap.servers"),
-              "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
-
-      SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer);
-
-      TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-              .withKafkaTopicSelector(new DefaultTopicSelector(topic))
-              .withProducerProperties(props)
-              .withTridentTupleToKafkaMapper(mapper);
-
-      TridentKafkaUpdater stateUpdater = new TridentKafkaUpdater();
-
-      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
-    }
-  }
-
-  @Override
-  public String scheme() {
-    return "kafka";
-  }
-
-  @Override
-  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
-                              List<FieldInfo> fields) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                Properties properties, List<FieldInfo> fields) {
-    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
-    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
-    Map<String, String> values = parseURIParams(uri.getQuery());
-    String topic = values.get("topic");
-    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
-    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
-    List<String> fieldNames = new ArrayList<>();
-    int primaryIndex = -1;
-    for (int i = 0; i < fields.size(); ++i) {
-      FieldInfo f = fields.get(i);
-      fieldNames.add(f.name());
-      if (f.isPrimary()) {
-        primaryIndex = i;
-      }
-    }
-    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
-    Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
-    conf.scheme = new SchemeAsMultiScheme(scheme);
-    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
-
-    return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, serializer);
-  }
-
-  private static Map<String, String> parseURIParams(String query) {
-    HashMap<String, String> res = new HashMap<>();
-    if (query == null) {
-      return res;
-    }
-
-    String[] params = query.split("&");
-    for (String p : params) {
-      String[] v = p.split("=", 2);
-      if (v.length > 1) {
-        res.put(v[0], v[1]);
-      }
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 7f687cc..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
deleted file mode 100644
index 0cde492..0000000
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.kafka;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.kafka.trident.TridentKafkaState;
-import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
-import org.apache.storm.kafka.trident.TridentKafkaUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Future;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class TestKafkaDataSourcesProvider {
-  private static final List<FieldInfo> FIELDS = ImmutableList.of(
-          new FieldInfo("ID", int.class, true),
-          new FieldInfo("val", String.class, false));
-  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
-  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final Properties TBL_PROPERTIES = new Properties();
-
-  static {
-    Map<String,Object> map = new HashMap<>();
-    map.put("bootstrap.servers", "localhost:9092");
-    map.put("acks", "1");
-    map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-    map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-    TBL_PROPERTIES.put("producer", map);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testKafkaSink() {
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-            URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
-    Assert.assertNotNull(ds);
-
-    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
-    Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass());
-    Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass());
-
-    TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-    KafkaProducer producer = mock(KafkaProducer.class);
-    doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
-    Whitebox.setInternalState(state, "producer", producer);
-
-    List<TridentTuple> tupleList = mockTupleList();
-    for (TridentTuple t : tupleList) {
-      state.updateState(Collections.singletonList(t), null);
-      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
-    }
-    verifyNoMoreInteractions(producer);
-  }
-
-  private static List<TridentTuple> mockTupleList() {
-    List<TridentTuple> tupleList = new ArrayList<>();
-    TridentTuple t0 = mock(TridentTuple.class);
-    TridentTuple t1 = mock(TridentTuple.class);
-    doReturn(1).when(t0).get(0);
-    doReturn(2).when(t1).get(0);
-    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
-    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
-    tupleList.add(t0);
-    tupleList.add(t1);
-    return tupleList;
-  }
-
-  private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
-    private static final int PRIMARY_INDEX = 0;
-    private final TridentTuple tuple;
-
-    private KafkaMessageMatcher(TridentTuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean matches(Object o) {
-      ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
-      if (m.key() != tuple.get(PRIMARY_INDEX)) {
-        return false;
-      }
-      ByteBuffer buf = m.value();
-      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
-      return b.equals(buf);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
deleted file mode 100644
index 40fde8a..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-sql-mongodb</artifactId>
-
-    <developers>
-        <developer>
-            <id>vesense</id>
-            <name>Xin Wang</name>
-            <email>data.xinwang@gmail.com</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-mongodb</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <sourceDirectory>src/jvm</sourceDirectory>
-        <testSourceDirectory>src/test</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>${basedir}/src/resources</directory>
-            </resource>
-        </resources>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
deleted file mode 100644
index 60d52d1..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.mongodb;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a MongoDB sink based on the URI and properties. The URI has the format of
- * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
- * The properties are in JSON format which specifies the name of the MongoDB collection and etc.
- */
-public class MongoDataSourcesProvider implements DataSourcesProvider {
-
-  private static class MongoTridentDataSource implements ISqlTridentDataSource {
-    private final String url;
-    private final Properties props;
-    private final IOutputSerializer serializer;
-
-    private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
-      this.url = url;
-      this.props = props;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public ITridentDataSource getProducer() {
-      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
-    }
-
-    @Override
-    public SqlTridentConsumer getConsumer() {
-      Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
-      String serField = props.getProperty("trident.ser.field", "tridentSerField");
-      MongoMapper mapper = new TridentMongoMapper(serField, serializer);
-
-      MongoState.Options options = new MongoState.Options()
-          .withUrl(url)
-          .withCollectionName(props.getProperty("collection.name"))
-          .withMapper(mapper);
-
-      StateFactory stateFactory = new MongoStateFactory(options);
-      StateUpdater stateUpdater = new MongoStateUpdater();
-
-      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
-    }
-  }
-
-  private static class TridentMongoMapper implements MongoMapper {
-    private final String serField;
-    private final IOutputSerializer serializer;
-
-    private TridentMongoMapper(String serField, IOutputSerializer serializer) {
-      this.serField = serField;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-      Document document = new Document();
-      byte[] array = serializer.write(tuple.getValues(), null).array();
-      document.append(serField, array);
-      return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-      return null;
-    }
-  }
-
-  @Override
-  public String scheme() {
-    return "mongodb";
-  }
-
-  @Override
-  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
-                              List<FieldInfo> fields) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                Properties properties, List<FieldInfo> fields) {
-    List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
-    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
-    return new MongoTridentDataSource(uri.toString(), properties, serializer);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index e46d794..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.mongodb.MongoDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
deleted file mode 100644
index 3b15345..0000000
--- a/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.mongodb;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.storm.mongodb.common.MongoDBClient;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.bson.Document;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class TestMongoDataSourcesProvider {
-  private static final List<FieldInfo> FIELDS = ImmutableList.of(
-      new FieldInfo("ID", int.class, true),
-      new FieldInfo("val", String.class, false));
-  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
-  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final Properties TBL_PROPERTIES = new Properties();
-
-  static {
-    TBL_PROPERTIES.put("collection.name", "collection1");
-    TBL_PROPERTIES.put("trident.ser.field", "tridentSerField");
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testMongoSink() {
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-            URI.create("mongodb://127.0.0.1:27017/test"), null, null, TBL_PROPERTIES, FIELDS);
-    Assert.assertNotNull(ds);
-
-    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
-    Assert.assertEquals(MongoStateFactory.class, consumer.getStateFactory().getClass());
-    Assert.assertEquals(MongoStateUpdater.class, consumer.getStateUpdater().getClass());
-
-    MongoState state = (MongoState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-    StateUpdater stateUpdater = consumer.getStateUpdater();
-
-    MongoDBClient mongoClient = mock(MongoDBClient.class);
-    Whitebox.setInternalState(state, "mongoClient", mongoClient);
-
-    List<TridentTuple> tupleList = mockTupleList();
-
-    for (TridentTuple t : tupleList) {
-      stateUpdater.updateState(state, Collections.singletonList(t), null);
-      verify(mongoClient).insert(argThat(new MongoArgMatcher(t)) , eq(true));
-    }
-
-    verifyNoMoreInteractions(mongoClient);
-  }
-
-  private static List<TridentTuple> mockTupleList() {
-    List<TridentTuple> tupleList = new ArrayList<>();
-    TridentTuple t0 = mock(TridentTuple.class);
-    TridentTuple t1 = mock(TridentTuple.class);
-    doReturn(1).when(t0).get(0);
-    doReturn(2).when(t1).get(0);
-    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
-    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
-    tupleList.add(t0);
-    tupleList.add(t1);
-    return tupleList;
-  }
-
-  private static class MongoArgMatcher extends ArgumentMatcher<List<Document>> {
-    private final TridentTuple tuple;
-
-    private MongoArgMatcher(TridentTuple tuple) {
-      this.tuple = tuple;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean matches(Object o) {
-      Document doc = ((List<Document>)o).get(0);
-      ByteBuffer buf = ByteBuffer.wrap((byte[])doc.get(TBL_PROPERTIES.getProperty("trident.ser.field")));
-      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
-      return b.equals(buf);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/pom.xml b/external/sql/storm-sql-external/storm-sql-redis/pom.xml
deleted file mode 100644
index 19cc699..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-sql-redis</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-redis</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <sourceDirectory>src/jvm</sourceDirectory>
-        <testSourceDirectory>src/test</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>${basedir}/src/resources</directory>
-            </resource>
-        </resources>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
deleted file mode 100644
index 68933b2..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.redis;
-
-import com.google.common.base.Preconditions;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.trident.state.RedisClusterState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.tuple.ITuple;
-import redis.clients.util.JedisURIHelper;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Create a Redis sink based on the URI and properties. The URI has the format of
- * redis://:[password]@[host]:[port]/[dbIdx]. Only host is mandatory and others can be set to default.
- *
- * The properties are in JSON format which specifies the config of the Redis data type and etc.
- * Please note that when "use.redis.cluster" is "true", cluster discovery is only done from given URI.
- */
-public class RedisDataSourcesProvider implements DataSourcesProvider {
-  private static final int DEFAULT_REDIS_PORT = 6379;
-  private static final int DEFAULT_TIMEOUT = 2000;
-
-  private abstract static class AbstractRedisTridentDataSource implements ISqlTridentDataSource, Serializable {
-    protected abstract StateFactory newStateFactory();
-    protected abstract StateUpdater newStateUpdater(RedisStoreMapper storeMapper);
-
-    private final Properties props;
-    private final List<FieldInfo> fields;
-    private final IOutputSerializer serializer;
-
-    AbstractRedisTridentDataSource(Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
-      this.props = props;
-      this.fields = fields;
-      this.serializer = serializer;
-    }
-
-    @Override
-    public ITridentDataSource getProducer() {
-      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
-    }
-
-    @Override
-    public SqlTridentConsumer getConsumer() {
-      RedisDataTypeDescription dataTypeDescription = getDataTypeDesc(props);
-
-      RedisStoreMapper storeMapper = new TridentRedisStoreMapper(dataTypeDescription, fields, serializer);
-
-      StateFactory stateFactory = newStateFactory();
-      StateUpdater stateUpdater = newStateUpdater(storeMapper);
-
-      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
-    }
-
-    private RedisDataTypeDescription getDataTypeDesc(Properties props) {
-      Preconditions.checkArgument(props.containsKey("data.type"),
-              "Redis data source must contain \"data.type\" config");
-
-      RedisDataTypeDescription.RedisDataType dataType = RedisDataTypeDescription.RedisDataType.valueOf(props.getProperty("data.type").toUpperCase());
-      String additionalKey = props.getProperty("data.additional.key");
-
-      return new RedisDataTypeDescription(dataType, additionalKey);
-    }
-  }
-
-  private static class RedisClusterTridentDataSource extends AbstractRedisTridentDataSource {
-    private final JedisClusterConfig config;
-
-    RedisClusterTridentDataSource(JedisClusterConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
-      super(props, fields, serializer);
-      this.config = config;
-    }
-
-    @Override
-    protected StateFactory newStateFactory() {
-      return new RedisClusterState.Factory(config);
-    }
-
-    @Override
-    protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
-      return new RedisClusterStateUpdater(storeMapper);
-    }
-  }
-
-  private static class RedisTridentDataSource extends AbstractRedisTridentDataSource {
-    private final JedisPoolConfig config;
-
-    RedisTridentDataSource(JedisPoolConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
-      super(props, fields, serializer);
-      this.config = config;
-    }
-
-    @Override
-    protected StateFactory newStateFactory() {
-      return new RedisState.Factory(config);
-    }
-
-    @Override
-    protected StateUpdater newStateUpdater(RedisStoreMapper storeMapper) {
-      return new RedisStateUpdater(storeMapper);
-    }
-  }
-
-  @Override
-  public String scheme() {
-    return "redis";
-  }
-
-  @Override
-  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) {
-    Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri);
-
-    String host = uri.getHost();
-    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_REDIS_PORT;
-    int dbIdx = JedisURIHelper.getDBIndex(uri);
-    String password = JedisURIHelper.getPassword(uri);
-
-    int timeout = Integer.parseInt(props.getProperty("redis.timeout", String.valueOf(DEFAULT_TIMEOUT)));
-
-    boolean clusterMode = Boolean.valueOf(props.getProperty("use.redis.cluster", "false"));
-
-    List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
-    IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, props, fieldNames);
-    if (clusterMode) {
-      JedisClusterConfig config = new JedisClusterConfig.Builder()
-              .setNodes(Collections.singleton(new InetSocketAddress(host, port)))
-              .setTimeout(timeout)
-              .build();
-      return new RedisClusterTridentDataSource(config, props, fields, serializer);
-    } else {
-      JedisPoolConfig config = new JedisPoolConfig(host, port, timeout, password, dbIdx);
-      return new RedisTridentDataSource(config, props, fields, serializer);
-    }
-  }
-
-  private static class TridentRedisStoreMapper implements RedisStoreMapper {
-    private final RedisDataTypeDescription dataTypeDescription;
-    private final FieldInfo primaryKeyField;
-    private final IOutputSerializer outputSerializer;
-
-    private TridentRedisStoreMapper(RedisDataTypeDescription dataTypeDescription, List<FieldInfo> fields, IOutputSerializer outputSerializer) {
-      this.dataTypeDescription = dataTypeDescription;
-      this.outputSerializer = outputSerializer;
-
-      // find primary key from constructor
-      FieldInfo pkField = findPrimaryKeyField(fields);
-      Preconditions.checkArgument(pkField != null, "Primary key must be presented to field list");
-
-      this.primaryKeyField = pkField;
-    }
-
-    private FieldInfo findPrimaryKeyField(List<FieldInfo> fields) {
-      FieldInfo pkField = null;
-      for (FieldInfo field : fields) {
-        if (field.isPrimary()) {
-          // TODO: this assumes key is only from the one field
-          // if not we need to have order of fields in PK
-          pkField = field;
-          break;
-        }
-      }
-      return pkField;
-    }
-
-    @Override
-    public RedisDataTypeDescription getDataTypeDescription() {
-      return dataTypeDescription;
-    }
-
-    @Override
-    public String getKeyFromTuple(ITuple tuple) {
-      String keyFieldName = primaryKeyField.name();
-      Object key = tuple.getValueByField(keyFieldName);
-      if (key == null) {
-        throw new NullPointerException("key field " + keyFieldName + " is null");
-      }
-      return String.valueOf(key);
-    }
-
-    @Override
-    public String getValueFromTuple(ITuple tuple) {
-      byte[] array = outputSerializer.write(tuple.getValues(), null).array();
-      return new String(array);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
deleted file mode 100644
index 23b0444..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.storm.sql.redis.RedisDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
deleted file mode 100644
index 94d4949..0000000
--- a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.redis;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.state.RedisClusterState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.Pipeline;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestRedisDataSourcesProvider {
-  private static final List<FieldInfo> FIELDS = ImmutableList.of(
-      new FieldInfo("ID", int.class, true),
-      new FieldInfo("val", String.class, false));
-  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
-  private static final String ADDITIONAL_KEY = "hello";
-  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final Properties TBL_PROPERTIES = new Properties();
-  private static final Properties CLUSTER_TBL_PROPERTIES = new Properties();
-
-  static {
-    TBL_PROPERTIES.put("data.type", "HASH");
-    TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
-    CLUSTER_TBL_PROPERTIES.put("data.type", "HASH");
-    CLUSTER_TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
-    CLUSTER_TBL_PROPERTIES.put("use.redis.cluster", "true");
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testRedisSink() {
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-        URI.create("redis://:foobared@localhost:6380/2"), null, null, TBL_PROPERTIES, FIELDS);
-    Assert.assertNotNull(ds);
-
-    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
-    Assert.assertEquals(RedisState.Factory.class, consumer.getStateFactory().getClass());
-    Assert.assertEquals(RedisStateUpdater.class, consumer.getStateUpdater().getClass());
-
-    RedisState state = (RedisState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-    StateUpdater stateUpdater = consumer.getStateUpdater();
-
-    JedisPool mockJedisPool = mock(JedisPool.class);
-    Jedis mockJedis = mock(Jedis.class);
-    Pipeline mockPipeline = mock(Pipeline.class);
-
-    Whitebox.setInternalState(state, "jedisPool", mockJedisPool);
-    when(mockJedisPool.getResource()).thenReturn(mockJedis);
-    when(mockJedis.pipelined()).thenReturn(mockPipeline);
-
-    List<TridentTuple> tupleList = mockTupleList();
-
-    stateUpdater.updateState(state, tupleList, null);
-    for (TridentTuple t : tupleList) {
-      // PK goes to the key
-      String id = String.valueOf(t.getValueByField("ID"));
-      String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
-      verify(mockPipeline).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
-    }
-
-    verify(mockPipeline).sync();
-    verify(mockJedis).close();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testRedisClusterSink() throws IOException {
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-        URI.create("redis://localhost:6380"), null, null, CLUSTER_TBL_PROPERTIES, FIELDS);
-    Assert.assertNotNull(ds);
-
-    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
-
-    Assert.assertEquals(RedisClusterState.Factory.class, consumer.getStateFactory().getClass());
-    Assert.assertEquals(RedisClusterStateUpdater.class, consumer.getStateUpdater().getClass());
-
-    RedisClusterState state = (RedisClusterState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-    StateUpdater stateUpdater = consumer.getStateUpdater();
-
-    JedisCluster mockJedisCluster = mock(JedisCluster.class);
-
-    Whitebox.setInternalState(state, "jedisCluster", mockJedisCluster);
-
-    List<TridentTuple> tupleList = mockTupleList();
-
-    stateUpdater.updateState(state, tupleList, null);
-    for (TridentTuple t : tupleList) {
-      // PK goes to the key
-      String id = String.valueOf(t.getValueByField("ID"));
-      String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array());
-      verify(mockJedisCluster).hset(eq(ADDITIONAL_KEY), eq(id), eq(serializedValue));
-    }
-
-    verify(mockJedisCluster, never()).close();
-  }
-
-  private static List<TridentTuple> mockTupleList() {
-    List<TridentTuple> tupleList = new ArrayList<>();
-    TridentTuple t0 = mock(TridentTuple.class);
-    TridentTuple t1 = mock(TridentTuple.class);
-    when(t0.getValueByField("ID")).thenReturn(1);
-    when(t0.getValueByField("val")).thenReturn("2");
-    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
-
-    when(t1.getValueByField("ID")).thenReturn(2);
-    when(t1.getValueByField("val")).thenReturn("3");
-    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
-
-    tupleList.add(t0);
-    tupleList.add(t1);
-    return tupleList;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
deleted file mode 100644
index 4666df4..0000000
--- a/external/sql/storm-sql-runtime/pom.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-sql-runtime</artifactId>
-
-    <developers>
-        <developer>
-            <id>haohui</id>
-            <name>Haohui Mai</name>
-            <email>ricetons@gmail.com</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-core</artifactId>
-            <version>${calcite.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-dbcp</groupId>
-                    <artifactId>commons-dbcp</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>jsr305</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-annotations</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-            <version>1.7.7</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-csv</artifactId>
-            <version>1.4</version>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <sourceDirectory>src/jvm</sourceDirectory>
-        <testSourceDirectory>src/test</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>${basedir}/src/resources</directory>
-            </resource>
-        </resources>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>appassembler-maven-plugin</artifactId>
-                <version>1.9</version>
-                <executions>
-                    <execution>
-                        <id>create-repo</id>
-                        <goals>
-                            <goal>create-repository</goal>
-                        </goals>
-                        <configuration>
-                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
-                            <repositoryLayout>flat</repositoryLayout>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
deleted file mode 100644
index aa7e435..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/calcite/interpreter/StormContext.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.interpreter;
-
-import org.apache.calcite.DataContext;
-
-import java.io.Serializable;
-
-/**
- * This is a hack to use Calcite Context.
- */
-public class StormContext extends Context implements Serializable {
-    public StormContext(DataContext root) {
-        super(root);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
deleted file mode 100644
index 64be39d..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public abstract class AbstractChannelHandler implements ChannelHandler {
-  @Override
-  public abstract void dataReceived(ChannelContext ctx, Values data);
-
-  @Override
-  public void channelInactive(ChannelContext ctx) {
-
-  }
-
-  @Override
-  public void exceptionCaught(Throwable cause) {
-
-  }
-
-  @Override
-  public void flush(ChannelContext ctx) {
-    ctx.flush();
-  }
-
-  @Override
-  public void setSource(ChannelContext ctx, Object source) {
-
-  }
-
-  public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
-    @Override
-    public void dataReceived(ChannelContext ctx, Values data) {
-      ctx.emit(data);
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
deleted file mode 100644
index 5b18b5b..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-
-import java.util.Map;
-
-/**
- * Subclass of AbstractTupleProcessor provides a series of tuple. It
- * takes a series of iterators of {@link Values} and produces a stream of
- * tuple.
- *
- * The subclass implements the {@see next()} method to provide
- * the output of the stream. It can choose to return null in {@see next()} to
- * indicate that this particular iteration is a no-op. SQL processors depend
- * on this semantic to implement filtering and nullable records.
- */
-public abstract class AbstractValuesProcessor {
-
-  /**
-   * Initialize the data sources.
-   *
-   * @param data a map from the table name to the iterators of the values.
-   *
-   */
-  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
-      result);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
deleted file mode 100644
index 65ad01c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public interface ChannelContext {
-  /**
-   * Emit data to the next stage of the data pipeline.
-   */
-  void emit(Values data);
-  void fireChannelInactive();
-  void flush();
-  void setSource(Object source);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
deleted file mode 100644
index af02b7e..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-/**
- * DataListener provides an event-driven interface for the user to process
- * series of events.
- */
-public interface ChannelHandler {
-  void dataReceived(ChannelContext ctx, Values data);
-
-  /**
-   * The producer of the data has indicated that the channel is no longer
-   * active.
-   * @param ctx
-   */
-  void channelInactive(ChannelContext ctx);
-
-  void exceptionCaught(Throwable cause);
-
-  void flush(ChannelContext ctx);
-
-  void setSource(ChannelContext ctx, Object source);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
deleted file mode 100644
index 3b5eedd..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public class Channels {
-  private static final ChannelContext VOID_CTX = new ChannelContext() {
-    @Override
-    public void emit(Values data) {}
-
-    @Override
-    public void fireChannelInactive() {}
-
-    @Override
-    public void flush() {
-
-    }
-
-    @Override
-    public void setSource(java.lang.Object source) {
-
-    }
-  };
-
-  private static class ChannelContextAdapter implements ChannelContext {
-    private final ChannelHandler handler;
-    private final ChannelContext next;
-
-    public ChannelContextAdapter(
-        ChannelContext next, ChannelHandler handler) {
-      this.handler = handler;
-      this.next = next;
-    }
-
-    @Override
-    public void emit(Values data) {
-      handler.dataReceived(next, data);
-    }
-
-    @Override
-    public void fireChannelInactive() {
-      handler.channelInactive(next);
-    }
-
-    @Override
-    public void flush() {
-      handler.flush(next);
-    }
-
-    @Override
-    public void setSource(java.lang.Object source) {
-      handler.setSource(next, source);
-      next.setSource(source); // propagate through the chain
-    }
-  }
-
-  private static class ForwardingChannelContext implements ChannelContext {
-    private final ChannelContext next;
-
-    public ForwardingChannelContext(ChannelContext next) {
-      this.next = next;
-    }
-
-    @Override
-    public void emit(Values data) {
-      next.emit(data);
-    }
-
-    @Override
-    public void fireChannelInactive() {
-      next.fireChannelInactive();
-    }
-
-    @Override
-    public void flush() {
-      next.flush();
-    }
-
-    @Override
-    public void setSource(Object source) {
-      next.setSource(source);
-    }
-  }
-
-  public static ChannelContext chain(
-      ChannelContext next, ChannelHandler handler) {
-    return new ChannelContextAdapter(next, handler);
-  }
-
-  public static ChannelContext voidContext() {
-    return VOID_CTX;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
deleted file mode 100644
index 352af73..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-/**
- * A DataSource ingests data in StormSQL. It provides a series of tuple to
- * the downstream {@link ChannelHandler}.
- *
- */
-public interface DataSource {
-  void open(ChannelContext ctx);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
deleted file mode 100644
index dbece9c..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
-public interface DataSourcesProvider {
-  /**
-   * @return the scheme of the data source
-   */
-  String scheme();
-
-  /**
-   * Construct a new data source.
-   * @param uri The URI that specifies the data source. The format of the URI
-   *            is fully customizable.
-   * @param inputFormatClass the name of the class that deserializes data.
-   *                         It is null when unspecified.
-   * @param outputFormatClass the name of the class that serializes data. It
-   *                          is null when unspecified.
-   * @param fields The name of the fields and the schema of the table.
-   */
-  DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields);
-
-  ISqlTridentDataSource constructTrident(
-          URI uri, String inputFormatClass, String outputFormatClass,
-          Properties properties, List<FieldInfo> fields);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
deleted file mode 100644
index dfefb01..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
-
-public class DataSourcesRegistry {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      DataSourcesRegistry.class);
-  private static final Map<String, DataSourcesProvider> providers;
-
-  static {
-    providers = new HashMap<>();
-    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
-        DataSourcesProvider.class);
-    for (DataSourcesProvider p : loader) {
-      LOG.info("Registering scheme {} with {}", p.scheme(), p);
-      providers.put(p.scheme(), p);
-    }
-  }
-
-  private DataSourcesRegistry() {
-  }
-
-  public static DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<FieldInfo> fields) {
-    DataSourcesProvider provider = providers.get(uri.getScheme());
-    if (provider == null) {
-      return null;
-    }
-
-    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
-  }
-
-  public static ISqlTridentDataSource constructTridentDataSource(
-          URI uri, String inputFormatClass, String outputFormatClass,
-          Properties properties, List<FieldInfo> fields) {
-    DataSourcesProvider provider = providers.get(uri.getScheme());
-    if (provider == null) {
-      return null;
-    }
-
-    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
-  }
-
-  /**
-   * Allow unit tests to inject data sources.
-   */
-  public static Map<String, DataSourcesProvider> providerMap() {
-    return providers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
deleted file mode 100644
index 03b030b..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.io.Serializable;
-
-/**
- * Describe each column of the field
- */
-public class FieldInfo implements Serializable {
-  private final String name;
-  private final Class<?> type;
-  private final boolean isPrimary;
-
-  public FieldInfo(String name, Class<?> type, boolean isPrimary) {
-    this.name = name;
-    this.type = type;
-    this.isPrimary = isPrimary;
-  }
-
-  public String name() {
-    return name;
-  }
-
-  public Class<?> type() {
-    return type;
-  }
-
-  public boolean isPrimary() {
-    return isPrimary;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
deleted file mode 100644
index b6670d9..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public interface IOutputSerializer {
-  /**
-   * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
-   * memory.
-   *
-   * @return A ByteBuffer contains the serialized result.
-   */
-  ByteBuffer write(List<Object> data, ByteBuffer buffer);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
deleted file mode 100644
index 9eae5ae..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-/**
- * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
- */
-public interface ISqlTridentDataSource {
-  /**
-   * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
-   *
-   * Please note that StateFactory and StateUpdater should use same class which implements State.
-   *
-   * @see org.apache.storm.trident.state.StateFactory
-   * @see org.apache.storm.trident.state.StateUpdater
-   */
-  interface SqlTridentConsumer {
-    StateFactory getStateFactory();
-    StateUpdater getStateUpdater();
-  }
-
-  /**
-   * Provides instance of ITridentDataSource which can be used as producer in Trident.
-   *
-   * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
-   * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
-   * - IBatchSpout
-   * - ITridentSpout
-   * - IPartitionedTridentSpout
-   * - IOpaquePartitionedTridentSpout
-   *
-   * @see org.apache.storm.trident.spout.ITridentDataSource
-   * @see org.apache.storm.trident.spout.IBatchSpout
-   * @see org.apache.storm.trident.spout.ITridentSpout
-   * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
-   * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
-   */
-  ITridentDataSource getProducer();
-
-  /**
-   * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
-   *
-   * @see SqlTridentConsumer
-   */
-  SqlTridentConsumer getConsumer();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
deleted file mode 100644
index c9abd16..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
-    private final StateFactory stateFactory;
-    private final StateUpdater stateUpdater;
-
-    public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
-        this.stateFactory = stateFactory;
-        this.stateUpdater = stateUpdater;
-    }
-
-    @Override
-    public StateFactory getStateFactory() {
-        return stateFactory;
-    }
-
-    @Override
-    public StateUpdater getStateUpdater() {
-        return stateUpdater;
-    }
-}