You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:44 UTC

[2/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index 7383235..9fe963e 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -47,15 +47,15 @@ public class DataSourcesRegistry {
     }
 
     /**
-     * Construct a trident data source.
+     * Construct a streams data source.
      * @param uri data source uri
      * @param inputFormatClass input format class
      * @param outputFormatClass output format class
      * @param properties Properties
      * @param fields fields info list
-     * @return TridentDataSource object
+     * @return StreamsDataSource object
      */
-    public static ISqlTridentDataSource constructTridentDataSource(
+    public static ISqlStreamsDataSource constructStreamsDataSource(
             URI uri, String inputFormatClass, String outputFormatClass,
             Properties properties, List<FieldInfo> fields) {
         DataSourcesProvider provider = providers.get(uri.getScheme());
@@ -63,7 +63,7 @@ public class DataSourcesRegistry {
             return null;
         }
 
-        return provider.constructTrident(uri, inputFormatClass, outputFormatClass, properties, fields);
+        return provider.constructStreams(uri, inputFormatClass, outputFormatClass, properties, fields);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
new file mode 100644
index 0000000..da569ad
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlStreamsDataSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+
+/**
+ * An ISqlStreamsDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlStreamsDataSource {
+    /**
+     * Provides instance of IRichSpout which can be used as producer in topology.
+     *
+     * @see org.apache.storm.topology.IRichSpout
+     */
+    IRichSpout getProducer();
+
+    /**
+     * Provides instance of IRichBolt which can be used as consumer in topology.
+     */
+    IRichBolt getConsumer();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
deleted file mode 100644
index 81fb386..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-/**
- * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
- */
-public interface ISqlTridentDataSource {
-    /**
-     * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State.
-     * <p/>
-     * Please note that StateFactory and StateUpdater should use same class which implements State.
-     *
-     * @see org.apache.storm.trident.state.StateFactory
-     * @see org.apache.storm.trident.state.StateUpdater
-     */
-    interface SqlTridentConsumer {
-        StateFactory getStateFactory();
-
-        StateUpdater getStateUpdater();
-    }
-
-    /**
-     * Provides instance of ITridentDataSource which can be used as producer in Trident.
-     * <p/>
-     * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively
-     * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants:
-     * - IBatchSpout
-     * - ITridentSpout
-     * - IPartitionedTridentSpout
-     * - IOpaquePartitionedTridentSpout
-     *
-     * @see org.apache.storm.trident.spout.ITridentDataSource
-     * @see org.apache.storm.trident.spout.IBatchSpout
-     * @see org.apache.storm.trident.spout.ITridentSpout
-     * @see org.apache.storm.trident.spout.IPartitionedTridentSpout
-     * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout
-     */
-    ITridentDataSource getProducer();
-
-    /**
-     * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident.
-     *
-     * @see SqlTridentConsumer
-     */
-    SqlTridentConsumer getConsumer();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
deleted file mode 100644
index f9a6039..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/SimpleSqlTridentConsumer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-
-public class SimpleSqlTridentConsumer implements ISqlTridentDataSource.SqlTridentConsumer {
-    private final StateFactory stateFactory;
-    private final StateUpdater stateUpdater;
-
-    public SimpleSqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) {
-        this.stateFactory = stateFactory;
-        this.stateUpdater = stateUpdater;
-    }
-
-    @Override
-    public StateFactory getStateFactory() {
-        return stateFactory;
-    }
-
-    @Override
-    public StateUpdater getStateUpdater() {
-        return stateUpdater;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
index 10e470d..48f3273 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -26,16 +26,13 @@ import org.apache.storm.spout.Scheme;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState;
-import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater;
-import org.apache.storm.sql.runtime.datasource.socket.trident.TridentSocketSpout;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.datasource.socket.bolt.SocketBolt;
+import org.apache.storm.sql.runtime.datasource.socket.spout.SocketSpout;
 import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
 import org.apache.storm.sql.runtime.utils.SerdeUtils;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
 
 /**
  * Create a Socket data source based on the URI and properties. The URI has the format of
@@ -50,35 +47,33 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
         return "socket";
     }
 
-    private static class SocketTridentDataSource implements ISqlTridentDataSource {
+    private static class SocketStreamsDataSource implements ISqlStreamsDataSource {
 
         private final String host;
         private final int port;
         private final Scheme scheme;
         private final IOutputSerializer serializer;
 
-        SocketTridentDataSource(Scheme scheme, IOutputSerializer serializer, String host, int port) {
-            this.scheme = scheme;
-            this.serializer = serializer;
+        SocketStreamsDataSource(String host, int port, Scheme scheme, IOutputSerializer serializer) {
             this.host = host;
             this.port = port;
+            this.scheme = scheme;
+            this.serializer = serializer;
         }
 
         @Override
-        public ITridentDataSource getProducer() {
-            return new TridentSocketSpout(scheme, host, port);
+        public IRichSpout getProducer() {
+            return new SocketSpout(scheme, host, port);
         }
 
         @Override
-        public SqlTridentConsumer getConsumer() {
-            StateFactory stateFactory = new SocketState.Factory(host, port);
-            StateUpdater<SocketState> stateUpdater = new SocketStateUpdater(serializer);
-            return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+        public IRichBolt getConsumer() {
+            return new SocketBolt(serializer, host, port);
         }
     }
 
     @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+    public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                   Properties properties, List<FieldInfo> fields) {
         String host = uri.getHost();
         int port = uri.getPort();
@@ -90,6 +85,6 @@ public class SocketDataSourcesProvider implements DataSourcesProvider {
         Scheme scheme = SerdeUtils.getScheme(inputFormatClass, properties, fieldNames);
         IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
 
-        return new SocketTridentDataSource(scheme, serializer, host, port);
+        return new SocketDataSourcesProvider.SocketStreamsDataSource(host, port, scheme, serializer);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
new file mode 100644
index 0000000..78adef3
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this for production.
+ */
+public class SocketBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(SocketBolt.class);
+
+    private final IOutputSerializer serializer;
+    private final String host;
+    private final int port;
+
+    private transient OutputCollector collector;
+    private transient BufferedWriter writer;
+    private transient Socket socket;
+
+    /**
+     * Constructor.
+     */
+    public SocketBolt(IOutputSerializer serializer, String host, int port) {
+        this.serializer = serializer;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+
+        try {
+            this.socket = new Socket(host, port);
+            this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
+        } catch (IOException e) {
+            throw new RuntimeException("Exception while initializing socket for State. host "
+                    + host + " port " + port, e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        Values values = (Values) input.getValue(0);
+        byte[] array = serializer.write(values, null).array();
+        String data = new String(array);
+
+        try {
+            writer.write(data + "\n");
+            writer.flush();
+            collector.ack(input);
+        } catch (IOException e) {
+            LOG.error("Error while writing data to socket.", e);
+            collector.reportError(e);
+            collector.fail(input);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        IOUtils.closeQuietly(writer);
+        IOUtils.closeQuietly(socket);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
new file mode 100644
index 0000000..b8743b9
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.spout;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Spout for Socket data. Only available for Storm SQL.
+ * The class doesn't handle reconnection, so you may not want to use this for production.
+ */
+public class SocketSpout implements IRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(SocketSpout.class);
+
+    private final String host;
+    private final int port;
+    private final Scheme scheme;
+
+    private volatile boolean running;
+
+    private BlockingDeque<List<Object>> queue;
+    private Socket socket;
+    private Thread readerThread;
+    private BufferedReader in;
+    private ObjectMapper objectMapper;
+
+    private SpoutOutputCollector collector;
+    private Map<String, List<Object>> emitted;
+
+    /**
+     * SocketSpout Constructor.
+     * @param scheme Scheme
+     * @param host socket host
+     * @param port socket port
+     */
+    public SocketSpout(Scheme scheme, String host, int port) {
+        this.scheme = scheme;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        this.queue = new LinkedBlockingDeque<>();
+        this.emitted = new HashMap<>();
+        this.objectMapper = new ObjectMapper();
+
+        try {
+            socket = new Socket(host, port);
+            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+        } catch (IOException e) {
+            throw new RuntimeException("Error opening socket: host " + host + " port " + port);
+        }
+
+        readerThread = new Thread(new SocketSpout.SocketReaderRunnable());
+        readerThread.start();
+    }
+
+    @Override
+    public void close() {
+        running = false;
+        readerThread.interrupt();
+        queue.clear();
+
+        closeQuietly(in);
+        closeQuietly(socket);
+    }
+
+    @Override
+    public void activate() {
+        running = true;
+    }
+
+    @Override
+    public void deactivate() {
+        running = false;
+    }
+
+    @Override
+    public void nextTuple() {
+        if (queue.peek() != null) {
+            List<Object> values = queue.poll();
+            if (values != null) {
+                String id = UUID.randomUUID().toString();
+                emitted.put(id, values);
+                collector.emit(values, id);
+            }
+        }
+    }
+
+    private List<Object> convertLineToTuple(String line) {
+        return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        emitted.remove(msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        List<Object> emittedValues = emitted.remove(msgId);
+        if (emittedValues != null) {
+            queue.addLast(emittedValues);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(scheme.getOutputFields());
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    private class SocketReaderRunnable implements Runnable {
+        public void run() {
+            while (running) {
+                try {
+                    String line = in.readLine();
+                    if (line == null) {
+                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
+                    }
+
+                    List<Object> values = convertLineToTuple(line.trim());
+                    queue.push(values);
+                } catch (Throwable t) {
+                    // This spout is added to test purpose, so just failing fast doesn't hurt much
+                    die(t);
+                }
+            }
+        }
+    }
+
+    private void die(Throwable t) {
+        LOG.error("Halting process: TridentSocketSpout died.", t);
+        if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
+            System.exit(11);
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            if (closeable != null) {
+                closeable.close();
+            }
+        } catch (final IOException ioe) {
+            // ignore
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
deleted file mode 100644
index 60c1a5e..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketState.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.Socket;
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-/**
- * Trident State implementation of Socket. It only supports writing.
- */
-public class SocketState implements State {
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void beginCommit(Long txid) {
-        // no-op
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void commit(Long txid) {
-        // no-op
-    }
-
-    public static class Factory implements StateFactory {
-        private final String host;
-        private final int port;
-
-        public Factory(String host, int port) {
-            this.host = host;
-            this.port = port;
-        }
-
-        @Override
-        public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            BufferedWriter out;
-            try {
-                Socket socket = new Socket(host, port);
-                out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
-            } catch (IOException e) {
-                throw new RuntimeException("Exception while initializing socket for State. host "
-                        + host + " port " + port, e);
-            }
-
-            // State doesn't have close() and Storm actually doesn't guarantee so we can't release socket resource anyway
-            return new SocketState(out);
-        }
-    }
-
-    private BufferedWriter out;
-
-    private SocketState(BufferedWriter out) {
-        this.out = out;
-    }
-
-    public void write(String str) throws IOException {
-        out.write(str);
-    }
-
-    public void flush() throws IOException {
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
deleted file mode 100644
index df2893b..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/SocketStateUpdater.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.storm.sql.runtime.IOutputSerializer;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StateUpdater for SocketState. Serializes tuple one by one and writes to socket, and finally flushes.
- */
-public class SocketStateUpdater extends BaseStateUpdater<SocketState> {
-    private static final Logger LOG = LoggerFactory.getLogger(SocketStateUpdater.class);
-
-    private final IOutputSerializer outputSerializer;
-
-    public SocketStateUpdater(IOutputSerializer outputSerializer) {
-        this.outputSerializer = outputSerializer;
-    }
-
-    @Override
-    public void updateState(SocketState state, List<TridentTuple> tuples, TridentCollector collector) {
-        try {
-            for (TridentTuple tuple : tuples) {
-                byte[] array = outputSerializer.write(tuple.getValues(), null).array();
-                String data = new String(array);
-                state.write(data + "\n");
-            }
-            state.flush();
-        } catch (IOException e) {
-            LOG.error("Error while updating state.", e);
-            collector.reportError(e);
-            throw new RuntimeException("Error while updating state.", e);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
deleted file mode 100644
index aad42fb..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime.datasource.socket.trident;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.storm.Config;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes.
- */
-public class TridentSocketSpout implements IBatchSpout {
-    private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
-
-    private final String host;
-    private final int port;
-    private final Scheme scheme;
-
-    private volatile boolean running = true;
-
-    private BlockingDeque<String> queue;
-    private Socket socket;
-    private Thread readerThread;
-    private BufferedReader in;
-    private ObjectMapper objectMapper;
-
-    private Map<Long, List<List<Object>>> batches;
-
-    /**
-     * TridentSocketSpout Constructor.
-     * @param scheme Scheme
-     * @param host socket host
-     * @param port socket port
-     */
-    public TridentSocketSpout(Scheme scheme, String host, int port) {
-        this.scheme = scheme;
-        this.host = host;
-        this.port = port;
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context) {
-        this.queue = new LinkedBlockingDeque<>();
-        this.objectMapper = new ObjectMapper();
-        this.batches = new HashMap<>();
-
-        try {
-            socket = new Socket(host, port);
-            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-        } catch (IOException e) {
-            throw new RuntimeException("Error opening socket: host " + host + " port " + port);
-        }
-
-        readerThread = new Thread(new SocketReaderRunnable());
-        readerThread.start();
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        // read line and parse it to json
-        List<List<Object>> batch = this.batches.get(batchId);
-        if (batch == null) {
-            batch = new ArrayList<>();
-
-            while (queue.peek() != null) {
-                String line = queue.poll();
-                List<Object> values = convertLineToTuple(line);
-                if (values == null) {
-                    continue;
-                }
-
-                batch.add(values);
-            }
-
-            this.batches.put(batchId, batch);
-        }
-
-        for (List<Object> list : batch) {
-            collector.emit(list);
-        }
-    }
-
-    private List<Object> convertLineToTuple(String line) {
-        return scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
-    }
-
-    @Override
-    public void ack(long batchId) {
-        this.batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-        running = false;
-        readerThread.interrupt();
-        queue.clear();
-
-        closeQuietly(in);
-        closeQuietly(socket);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return scheme.getOutputFields();
-    }
-
-    private class SocketReaderRunnable implements Runnable {
-        public void run() {
-            while (running) {
-                try {
-                    String line = in.readLine();
-                    if (line == null) {
-                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
-                    }
-
-                    queue.push(line.trim());
-                } catch (Throwable t) {
-                    // This spout is added to test purpose, so just failing fast doesn't hurt much
-                    die(t);
-                }
-            }
-        }
-    }
-
-    private void die(Throwable t) {
-        LOG.error("Halting process: TridentSocketSpout died.", t);
-        if (running || (t instanceof Error)) { //don't exit if not running, unless it is an Error
-            System.exit(11);
-        }
-    }
-
-    private void closeQuietly(final Closeable closeable) {
-        try {
-            if (closeable != null) {
-                closeable.close();
-            }
-        } catch (final IOException ioe) {
-            // ignore
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
new file mode 100644
index 0000000..215e47f
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationCalc.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.FlatMapFunction;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationCalc implements FlatMapFunction<Values, Values> {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
+
+    private final ExecutableExpression filterInstance;
+    private final ExecutableExpression projectionInstance;
+    private final Object[] outputValues;
+    private final DataContext dataContext;
+
+    /**
+     * EvaluationCalc Constructor.
+     * @param filterInstance ExecutableExpression
+     * @param projectionInstance ExecutableExpression
+     * @param outputCount output count
+     * @param dataContext DataContext
+     */
+    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
+                          int outputCount, DataContext dataContext) {
+        this.filterInstance = filterInstance;
+        this.projectionInstance = projectionInstance;
+        this.outputValues = new Object[outputCount];
+        this.dataContext = dataContext;
+
+        if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+        }
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public Iterable<Values> apply(Values input) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.toArray();
+
+        if (filterInstance != null) {
+            filterInstance.execute(calciteContext, outputValues);
+            // filtered out
+            if (outputValues[0] == null || !((Boolean) outputValues[0])) {
+                return Collections.emptyList();
+            }
+        }
+
+        if (projectionInstance != null) {
+            projectionInstance.execute(calciteContext, outputValues);
+            return Collections.singletonList(new Values(outputValues));
+        } else {
+            return Collections.singletonList(new Values(input.toArray()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
new file mode 100644
index 0000000..c3237fe
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFilter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationFilter implements Predicate<Values> {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+    private final ExecutableExpression filterInstance;
+    private final DataContext dataContext;
+    private final Object[] outputValues;
+
+    /**
+     * EvaluationFilter Constructor.
+     * @param filterInstance ExecutableExpression
+     * @param dataContext DataContext
+     */
+    public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
+        this.filterInstance = filterInstance;
+        this.dataContext = dataContext;
+        this.outputValues = new Object[1];
+
+        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public boolean test(Values input) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.toArray();
+        filterInstance.execute(calciteContext, outputValues);
+        return (outputValues[0] != null && (boolean) outputValues[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
new file mode 100644
index 0000000..b10b4e8
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/EvaluationFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.StormContext;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvaluationFunction implements Function<Values, Values> {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+    private final ExecutableExpression projectionInstance;
+    private final Object[] outputValues;
+    private final DataContext dataContext;
+
+    /**
+     * EvaluationFunction Constructor.
+     * @param projectionInstance ExecutableExpression
+     * @param outputCount output count
+     * @param dataContext DataContext
+     */
+    public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
+        this.projectionInstance = projectionInstance;
+        this.outputValues = new Object[outputCount];
+        this.dataContext = dataContext;
+
+        if (projectionInstance instanceof DebuggableExecutableExpression) {
+            LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
+        }
+    }
+
+    @Override
+    public Values apply(Values input) {
+        Context calciteContext = new StormContext(dataContext);
+        calciteContext.values = input.toArray();
+        projectionInstance.execute(calciteContext, outputValues);
+        return new Values(outputValues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
new file mode 100644
index 0000000..fd5035a
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamInsertMapToPairFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairFunction;
+import org.apache.storm.tuple.Values;
+
+public class StreamInsertMapToPairFunction implements PairFunction<Values, Object, Values> {
+
+    private final int primaryKeyIndex;
+
+    public StreamInsertMapToPairFunction(int primaryKeyIndex) {
+        this.primaryKeyIndex = primaryKeyIndex;
+    }
+
+    @Override
+    public Pair<Object, Values> apply(Values input) {
+        return Pair.of(input.get(primaryKeyIndex), input);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
new file mode 100644
index 0000000..db5c5da
--- /dev/null
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/streams/functions/StreamsScanTupleValueMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.streams.functions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.streams.operations.mappers.TupleValueMapper;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+public class StreamsScanTupleValueMapper implements TupleValueMapper<Values> {
+    private final List<String> fieldNames;
+
+    /**
+     * Constructor.
+     *
+     * @param fieldNames fields to be added to.
+     */
+    public StreamsScanTupleValueMapper(List<String> fieldNames) {
+        // prevent issue when the implementation of fieldNames is not serializable
+        // getRowType().getFieldNames() returns Calcite Pair$ which is NOT serializable
+        this.fieldNames = new ArrayList<>(fieldNames);
+    }
+
+    @Override
+    public Values apply(Tuple input) {
+        Values values = new Values();
+        for (String field : fieldNames) {
+            values.add(input.getValueByField(field));
+        }
+        return values;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
deleted file mode 100644
index 1b2f250..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationCalc.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareFlatMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationCalc implements OperationAwareFlatMapFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationCalc.class);
-
-    private final ExecutableExpression filterInstance;
-    private final ExecutableExpression projectionInstance;
-    private final Object[] outputValues;
-    private final DataContext dataContext;
-
-    /**
-     * EvaluationCalc Constructor.
-     * @param filterInstance ExecutableExpression
-     * @param projectionInstance ExecutableExpression
-     * @param outputCount output count
-     * @param dataContext DataContext
-     */
-    public EvaluationCalc(ExecutableExpression filterInstance, ExecutableExpression projectionInstance,
-                          int outputCount, DataContext dataContext) {
-        this.filterInstance = filterInstance;
-        this.projectionInstance = projectionInstance;
-        this.outputValues = new Object[outputCount];
-        this.dataContext = dataContext;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TridentOperationContext context) {
-        if (projectionInstance != null && projectionInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for projection: \n{}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
-        }
-        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public Iterable<Values> execute(TridentTuple input) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = input.getValues().toArray();
-
-        if (filterInstance != null) {
-            filterInstance.execute(calciteContext, outputValues);
-            // filtered out
-            if (outputValues[0] == null || !((Boolean) outputValues[0])) {
-                return Collections.emptyList();
-            }
-        }
-
-        if (projectionInstance != null) {
-            projectionInstance.execute(calciteContext, outputValues);
-            return Collections.singletonList(new Values(outputValues));
-        } else {
-            return Collections.singletonList(new Values(input.getValues()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
deleted file mode 100644
index 83e187c..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.BaseFilter;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationFilter extends BaseFilter {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
-
-    private final ExecutableExpression filterInstance;
-    private final DataContext dataContext;
-    private final Object[] outputValues;
-
-    /**
-     * EvaluationFilter Constructor.
-     * @param filterInstance ExecutableExpression
-     * @param dataContext DataContext
-     */
-    public EvaluationFilter(ExecutableExpression filterInstance, DataContext dataContext) {
-        this.filterInstance = filterInstance;
-        this.dataContext = dataContext;
-        this.outputValues = new Object[1];
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TridentOperationContext context) {
-        if (filterInstance != null && filterInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code for filter: \n{}", ((DebuggableExecutableExpression) filterInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public boolean isKeep(TridentTuple tuple) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = tuple.getValues().toArray();
-        filterInstance.execute(calciteContext, outputValues);
-        return (outputValues[0] != null && (boolean) outputValues[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
deleted file mode 100644
index 340492d..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import java.util.Map;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.StormContext;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.trident.operation.OperationAwareMapFunction;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EvaluationFunction implements OperationAwareMapFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
-
-    private final ExecutableExpression projectionInstance;
-    private final Object[] outputValues;
-    private final DataContext dataContext;
-
-    /**
-     * EvaluationFunction Constructor.
-     * @param projectionInstance ExecutableExpression
-     * @param outputCount output count
-     * @param dataContext DataContext
-     */
-    public EvaluationFunction(ExecutableExpression projectionInstance, int outputCount, DataContext dataContext) {
-        this.projectionInstance = projectionInstance;
-        this.outputValues = new Object[outputCount];
-        this.dataContext = dataContext;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TridentOperationContext context) {
-        if (projectionInstance instanceof DebuggableExecutableExpression) {
-            LOG.info("Expression code: {}", ((DebuggableExecutableExpression) projectionInstance).getDelegateCode());
-        }
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public Values execute(TridentTuple input) {
-        Context calciteContext = new StormContext(dataContext);
-        calciteContext.values = input.getValues().toArray();
-        projectionInstance.execute(calciteContext, outputValues);
-        return new Values(outputValues);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
deleted file mode 100644
index cb12ab3..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.storm.sql.runtime.trident.functions;
-
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class ForwardFunction extends BaseFunction {
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(tuple.getValues());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 4c945fa..f728bf6 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -1,38 +1,23 @@
 /*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  * <p>
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  * <p>
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  */
 package org.apache.storm.sql;
 
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,7 +26,36 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.junit.rules.ExternalResource;
+
 public class TestUtils {
+  public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() {
+    @Override
+    protected void before() throws Throwable {
+      MockInsertBolt.getCollectedValues().clear();
+    }
+  };
+
+  public static final ExternalResource mockBoltValueResource = new ExternalResource() {
+    @Override
+    protected void before() throws Throwable {
+      MockBolt.getCollectedValues().clear();
+    }
+  };
+
   public static class MyPlus {
     public static Integer evaluate(Integer x, Integer y) {
       return x + y;
@@ -85,428 +99,264 @@ public class TestUtils {
     }
   }
 
-  public static class MockState implements State {
-    /**
-     * Collect all values in a static variable as the instance will go through serialization and deserialization.
-     * NOTE: This should be cleared before or after running each test.
-     */
-    private transient static final List<List<Object> > VALUES = new ArrayList<>();
+  public static class MockSpout extends BaseRichSpout {
 
-    public static List<List<Object>> getCollectedValues() {
-      return VALUES;
+    private final List<Values> records;
+    private final Fields outputFields;
+    private boolean emitted = false;
+    private SpoutOutputCollector collector;
+
+    public MockSpout(List<Values> records, Fields outputFields) {
+      this.records = records;
+      this.outputFields = outputFields;
     }
 
     @Override
-    public void beginCommit(Long txid) {
-      // NOOP
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+      this.collector = collector;
     }
 
     @Override
-    public void commit(Long txid) {
-      // NOOP
-    }
+    public void nextTuple() {
+      if (emitted) {
+        return;
+      }
 
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-      for (TridentTuple tuple : tuples) {
-        VALUES.add(tuple.getValues());
+      for (Values r : records) {
+        collector.emit(r);
       }
-    }
-  }
 
-  public static class MockStateFactory implements StateFactory {
+      emitted = true;
+    }
 
     @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-      return new MockState();
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(outputFields);
     }
   }
 
-  public static class MockStateUpdater implements StateUpdater<MockState> {
+  public static class MockBolt extends BaseRichBolt {
+    /**
+     * Collect all values in a static variable as the instance will go through serialization and deserialization.
+     * NOTE: This should be cleared before or after running each test.
+     */
+    private transient static final List<Values> VALUES = new ArrayList<>();
 
-    @Override
-    public void updateState(MockState state, List<TridentTuple> tuples, TridentCollector collector) {
-      state.updateState(tuples, collector);
+    public static List<Values> getCollectedValues() {
+      return VALUES;
     }
 
     @Override
-    public void prepare(Map<String, Object> conf, TridentOperationContext context) {
-      // NOOP
-    }
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
 
-    @Override
-    public void cleanup() {
-      // NOOP
     }
-  }
 
-  public static class MockSqlExprDataSource implements ISqlTridentDataSource {
     @Override
-    public IBatchSpout getProducer() {
-      return new MockSqlExprDataSource.MockSpout();
+    public void execute(Tuple input) {
+      VALUES.add((Values) input.getValue(0));
     }
 
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
-    }
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
-    private static class MockSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
-
-      public MockSpout() {
-        for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i, "x", null));
-        }
-      }
-
-      private boolean emitted = false;
+    }
+  }
 
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
+  public static class MockInsertBolt extends BaseRichBolt {
+    /**
+     * Collect all values in a static variable as the instance will go through serialization and deserialization.
+     * NOTE: This should be cleared before or after running each test.
+     */
+    private transient static final List<Pair<Object, Values>> VALUES = new ArrayList<>();
 
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
+    public static List<Pair<Object, Values>> getCollectedValues() {
+      return VALUES;
+    }
 
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
 
-      @Override
-      public void ack(long batchId) {
-      }
+    }
 
-      @Override
-      public void close() {
-      }
+    @Override
+    public void execute(Tuple input) {
+      VALUES.add(Pair.of(input.getValue(0), (Values) input.getValue(1)));
+    }
 
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
-      }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
     }
   }
 
-  public static class MockSqlTridentDataSource implements ISqlTridentDataSource {
+  public static class MockSqlExprDataSource implements ISqlStreamsDataSource {
     @Override
-    public IBatchSpout getProducer() {
-      return new MockSpout();
+    public IRichSpout getProducer() {
+      throw new UnsupportedOperationException("Not supported.");
     }
 
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
-    }
-
-    private static class MockSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
-
-      public MockSpout() {
-        RECORDS.add(new Values(0, "a", "y"));
-        RECORDS.add(new Values(1, "ab", "y"));
-        RECORDS.add(new Values(2, "abc", "y"));
-        RECORDS.add(new Values(3, "abcd", "y"));
-        RECORDS.add(new Values(4, "abcde", "y"));
-      }
-
-      private boolean emitted = false;
-
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
-
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
-
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
-
-      @Override
-      public void ack(long batchId) {
-      }
-
-      @Override
-      public void close() {
-      }
-
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
-      }
-
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
+    public IRichBolt getConsumer() {
+      return new MockBolt();
     }
   }
 
-  public static class MockSqlTridentGroupedDataSource implements ISqlTridentDataSource {
+  public static class MockSqlStreamsOutputDataSource implements ISqlStreamsDataSource {
+
     @Override
-    public IBatchSpout getProducer() {
-      return new MockGroupedSpout();
+    public IRichSpout getProducer() {
+      throw new UnsupportedOperationException("Not supported.");
     }
 
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
     }
 
-    private static class MockGroupedSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE", "SCORE");
-
-      public MockGroupedSpout() {
-        for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i, 0, "x", "y", 5 - i, i * 10));
-        }
-      }
-
-      private boolean emitted = false;
+  }
 
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
+  public static class MockSqlStreamsDataSource implements ISqlStreamsDataSource {
+    @Override
+    public IRichSpout getProducer() {
+      List<Values> records = new ArrayList<>();
+      records.add(new Values(0, "a", "y"));
+      records.add(new Values(1, "ab", "y"));
+      records.add(new Values(2, "abc", "y"));
+      records.add(new Values(3, "abcd", "y"));
+      records.add(new Values(4, "abcde", "y"));
 
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
+      Fields outputFields = new Fields("ID", "NAME", "ADDR");
+      return new MockSpout(records, outputFields);
+    }
 
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
+    @Override
+    public IRichBolt getConsumer() {
+      return new MockBolt();
+    }
 
-      @Override
-      public void ack(long batchId) {
-      }
+  }
 
-      @Override
-      public void close() {
-      }
+  public static class MockSqlStreamsInsertDataSource extends MockSqlStreamsNestedDataSource {
+    @Override
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
+    }
+  }
 
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
+  public static class MockSqlStreamsGroupedDataSource implements ISqlStreamsDataSource {
+    @Override
+    public IRichSpout getProducer() {
+      List<Values> records = new ArrayList<>();
+      for (int i = 0; i < 5; ++i) {
+        records.add(new Values(i, 0, "x", "y", 5 - i, i * 10));
       }
 
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
+      Fields outputFields = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE", "SCORE");
+      return new MockSpout(records, outputFields);
     }
-  }
 
-  public static class MockSqlTridentJoinDataSourceEmp implements ISqlTridentDataSource {
     @Override
-    public IBatchSpout getProducer() {
-      return new MockSpout();
+    public IRichBolt getConsumer() {
+      return new MockBolt();
     }
+  }
 
+  public static class MockSqlStreamsInsertGroupedDataSource extends MockSqlStreamsGroupedDataSource {
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
     }
+  }
 
-    private static class MockSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("EMPID", "EMPNAME", "DEPTID");
-
-      public MockSpout() {
-        for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i, "emp-" + i, i % 2));
-        }
-        for (int i = 10; i < 15; ++i) {
-          RECORDS.add(new Values(i, "emp-" + i, i));
-        }
-      }
-
-      private boolean emitted = false;
-
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
-
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
-
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
-
-      @Override
-      public void ack(long batchId) {
-      }
+  public static class MockSqlStreamsJoinDataSourceEmp implements ISqlStreamsDataSource {
+    @Override
+    public IRichSpout getProducer() {
+      List<Values> records = new ArrayList<>();
+      Fields outputFields = new Fields("EMPID", "EMPNAME", "DEPTID");
 
-      @Override
-      public void close() {
+      for (int i = 0; i < 5; ++i) {
+        records.add(new Values(i, "emp-" + i, i % 2));
       }
-
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
+      for (int i = 10; i < 15; ++i) {
+        records.add(new Values(i, "emp-" + i, i));
       }
 
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
+      return new MockSpout(records, outputFields);
     }
-  }
 
-  public static class MockSqlTridentJoinDataSourceDept implements ISqlTridentDataSource {
     @Override
-    public IBatchSpout getProducer() {
-      return new MockSpout();
+    public IRichBolt getConsumer() {
+      return new MockBolt();
     }
+  }
 
+  public static class MockSqlStreamsInsertJoinDataSourceEmp extends MockSqlStreamsJoinDataSourceEmp {
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
     }
+  }
 
-    private static class MockSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("DEPTID", "DEPTNAME");
-
-      public MockSpout() {
-        for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i, "dept-" + i));
-        }
-      }
-
-      private boolean emitted = false;
-
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
-
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
-
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
-
-      @Override
-      public void ack(long batchId) {
-      }
+  public static class MockSqlStreamsJoinDataSourceDept implements ISqlStreamsDataSource {
+    @Override
+    public IRichSpout getProducer() {
+      List<Values> records = new ArrayList<>();
+      Fields outputFields = new Fields("DEPTID", "DEPTNAME");
 
-      @Override
-      public void close() {
+      for (int i = 0; i < 5; ++i) {
+        records.add(new Values(i, "dept-" + i));
       }
 
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
-      }
+      return new MockSpout(records, outputFields);
+    }
 
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
+    @Override
+    public IRichBolt getConsumer() {
+      return new MockBolt();
     }
   }
 
-  public static class MockSqlTridentNestedDataSource implements ISqlTridentDataSource {
+  public static class MockSqlStreamsInsertJoinDataSourceDept extends MockSqlStreamsJoinDataSourceDept {
     @Override
-    public IBatchSpout getProducer() {
-      return new MockSpout();
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
     }
+  }
 
+  public static class MockSqlStreamsNestedDataSource implements ISqlStreamsDataSource {
     @Override
-    public SqlTridentConsumer getConsumer() {
-      return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater());
-    }
-
-    private static class MockSpout implements IBatchSpout {
-      private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD");
-
-      public MockSpout() {
-        List<Integer> ints = Arrays.asList(100, 200, 300);
-        for (int i = 0; i < 5; ++i) {
-          Map<String, Integer> map = new HashMap<>();
-          map.put("b", i);
-          map.put("c", i*i);
-          Map<String, Map<String, Integer>> mm = new HashMap<>();
-          mm.put("a", map);
-          RECORDS.add(new Values(i, map, mm, ints));
-        }
-      }
-
-      private boolean emitted = false;
-
-      @Override
-      public void open(Map<String, Object> conf, TopologyContext context) {
-      }
-
-      @Override
-      public void emitBatch(long batchId, TridentCollector collector) {
-        if (emitted) {
-          return;
-        }
-
-        for (Values r : RECORDS) {
-          collector.emit(r);
-        }
-        emitted = true;
-      }
-
-      @Override
-      public void ack(long batchId) {
-      }
+    public IRichSpout getProducer() {
+      List<Values> records = new ArrayList<>();
+      Fields outputFields = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD");
 
-      @Override
-      public void close() {
+      List<Integer> ints = Arrays.asList(100, 200, 300);
+      for (int i = 0; i < 5; ++i) {
+        Map<String, Integer> map = new HashMap<>();
+        map.put("b", i);
+        map.put("c", i*i);
+        Map<String, Map<String, Integer>> mm = new HashMap<>();
+        mm.put("a", map);
+        records.add(new Values(i, map, mm, ints));
       }
 
-      @Override
-      public Map<String, Object> getComponentConfiguration() {
-        return null;
-      }
+      return new MockSpout(records, outputFields);
+    }
 
-      @Override
-      public Fields getOutputFields() {
-        return OUTPUT_FIELDS;
-      }
+    @Override
+    public IRichBolt getConsumer() {
+      return new MockBolt();
     }
   }
 
+  public static class MockSqlStreamsInsertNestedDataSource extends MockSqlStreamsNestedDataSource {
+    @Override
+    public IRichBolt getConsumer() {
+      return new MockInsertBolt();
+    }
+  }
 
   public static long monotonicNow() {
     final long NANOSECONDS_PER_MILLISECOND = 1000000;
     return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
   }
-}
+}
\ No newline at end of file