You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:10 UTC

[07/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
new file mode 100644
index 0000000..05b8e7a
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.wrappers.bolts;
+
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A generic `ShellBolt` implementation that allows you specify output fields
+ * and even streams without having to subclass `ShellBolt` to do so.
+ *
+ */
+public class FluxShellBolt extends ShellBolt implements IRichBolt{
+    private Map<String, String[]> outputFields;
+    private Map<String, Object> componentConfig;
+    
+    /**
+     * Create a ShellBolt with command line arguments
+     * @param command Command line arguments for the bolt
+     */
+    public FluxShellBolt(String[] command){
+        super(command);
+        this.outputFields = new HashMap<String, String[]>();
+    }
+
+    /**
+     * Create a ShellBolt with command line arguments and output fields
+     * 
+     * Keep this constructor for backward compatibility.
+     * 
+     * @param command Command line arguments for the bolt
+     * @param outputFields Names of fields the bolt will emit (if any).
+     */
+    public FluxShellBolt(String[] command, String[] outputFields){
+        this(command);
+        this.setDefaultStream(outputFields);
+    }
+
+    /**
+     * Add configuration for this bolt. This method is called from YAML file:
+     *
+     * ```
+     * className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+     * constructorArgs:
+     * # command line
+     * - ["python", "splitsentence.py"]
+     * # output fields
+     * - ["word"]
+     * configMethods:
+     * - name: "addComponentConfig"
+     *   args: ["publisher.data_paths", "actions"]
+     * ```
+     *
+     * @param key
+     * @param value
+     */
+    public void addComponentConfig(String key, Object value) {
+        if (this.componentConfig == null) {
+            this.componentConfig = new HashMap<String, Object>();
+        }
+        this.componentConfig.put(key, value);
+    }
+
+    /**
+     * Add configuration for this bolt. This method is called from YAML file:
+     *
+     * ```
+     * className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+     * constructorArgs:
+     * # command line
+     * - ["python", "splitsentence.py"]
+     * # output fields
+     * - ["word"]
+     * configMethods:
+     * - name: "addComponentConfig"
+     *   args:
+     *   - "publisher.data_paths"
+     *   - ["actions"]
+     * ```
+     *
+     * @param key
+     * @param values
+     */
+    public void addComponentConfig(String key, List<Object> values) {
+        if (this.componentConfig == null) {
+            this.componentConfig = new HashMap<String, Object>();
+        }
+        this.componentConfig.put(key, values);
+    }
+
+    /**
+     * Set default stream outputFields, this method is called from YAML file:
+     * 
+     * ```
+     * bolts:
+     * - className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
+     *   id: my_bolt
+     *   constructorArgs:
+     *   - [python, my_bolt.py]
+     *   configMethods:
+     *   - name: setDefaultStream
+     *     args:
+     *     - [word, count]
+     * ```
+     * 
+     * @param outputFields Names of fields the bolt will emit (if any) in default stream.
+     */
+    public void setDefaultStream(String[] outputFields) {
+        this.setNamedStream("default", outputFields);
+    }
+
+    /**
+     * Set custom *named* stream outputFields, this method is called from YAML file:
+     * 
+     * ```
+     * bolts:
+     * - className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
+     *   id: my_bolt
+     *   constructorArgs:
+     *   - [python, my_bolt.py]
+     *   configMethods:
+     *   - name: setNamedStream
+     *     args:
+     *     - first
+     *     - [word, count]
+     * ```
+     * @param name Name of stream the bolt will emit into.
+     * @param outputFields Names of fields the bolt will emit in custom *named* stream.
+     */
+    public void setNamedStream(String name, String[] outputFields) {
+        this.outputFields.put(name, outputFields);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        Iterator it = this.outputFields.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry entryTuple = (Map.Entry)it.next();
+            String key = (String)entryTuple.getKey();
+            String[] value = (String[])entryTuple.getValue();
+            if(key.equals("default")) {
+                declarer.declare(new Fields(value));
+            } else {
+                declarer.declareStream(key, new Fields(value));
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return this.componentConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
new file mode 100644
index 0000000..5f0e84b
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.wrappers.bolts;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple bolt that does nothing other than LOG.info() every tuple recieveed.
+ *
+ */
+public class LogInfoBolt extends BaseBasicBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(LogInfoBolt.class);
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+       LOG.info("{}", tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
new file mode 100644
index 0000000..5fd378d
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.wrappers.spouts;
+
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * A generic `ShellSpout` implementation that allows you specify output fields
+ * and even streams without having to subclass `ShellSpout` to do so.
+ *
+ */
+public class FluxShellSpout extends ShellSpout implements IRichSpout {
+    private Map<String, String[]> outputFields;
+    private Map<String, Object> componentConfig;
+    
+    /**
+     * Create a ShellSpout with command line arguments
+     * @param command Command line arguments for the bolt
+     */
+    public FluxShellSpout(String[] command){
+        super(command);
+        this.outputFields = new HashMap<String, String[]>();
+    }
+
+    /**
+     * Create a ShellSpout with command line arguments and output fields
+     * 
+     * Keep this constructor for backward compatibility.
+     * 
+     * @param args Command line arguments for the spout
+     * @param outputFields Names of fields the spout will emit.
+     */
+    public FluxShellSpout(String[] args, String[] outputFields){
+        this(args);
+        this.setDefaultStream(outputFields);
+    }
+
+    /**
+     * Add configuration for this spout. This method is called from YAML file:
+     *
+     * ```
+     * className: "org.apache.storm.flux.wrappers.bolts.FluxShellSpout"
+     * constructorArgs:
+     * # command line
+     * - ["python", "splitsentence.py"]
+     * # output fields
+     * - ["word"]
+     * configMethods:
+     * - name: "addComponentConfig"
+     *   args: ["publisher.data_paths", "actions"]
+     * ```
+     *
+     * @param key
+     * @param value
+     */
+    public void addComponentConfig(String key, Object value) {
+        if (this.componentConfig == null) {
+            this.componentConfig = new HashMap<String, Object>();
+        }
+        this.componentConfig.put(key, value);
+    }
+
+    /**
+     * Add configuration for this spout. This method is called from YAML file:
+     *
+     * ```
+     * className: "org.apache.storm.flux.wrappers.bolts.FluxShellSpout"
+     * constructorArgs:
+     * # command line
+     * - ["python", "splitsentence.py"]
+     * # output fields
+     * - ["word"]
+     * configMethods:
+     * - name: "addComponentConfig"
+     *   args:
+     *   - "publisher.data_paths"
+     *   - ["actions"]
+     * ```
+     *
+     * @param key
+     * @param values
+     */
+    public void addComponentConfig(String key, List<Object> values) {
+        if (this.componentConfig == null) {
+            this.componentConfig = new HashMap<String, Object>();
+        }
+        this.componentConfig.put(key, values);
+    }
+
+    /**
+     * Set default stream outputFields, this method is called from YAML file:
+     * 
+     * ```
+     * spouts:
+     * - className: org.apache.storm.flux.wrappers.bolts.FluxShellSpout
+     *   id: my_spout
+     *   constructorArgs:
+     *   - [python, my_spout.py]
+     *   configMethods:
+     *   - name: setDefaultStream
+     *     args:
+     *     - [word, count]
+     * ```
+     * 
+     * @param outputFields Names of fields the spout will emit (if any) in default stream.
+     */
+    public void setDefaultStream(String[] outputFields) {
+        this.setNamedStream("default", outputFields);
+    }
+
+    /**
+     * Set custom *named* stream outputFields, this method is called from YAML file:
+     * 
+     * ```
+     * spouts:
+     * - className: org.apache.storm.flux.wrappers.bolts.FluxShellSpout
+     *   id: my_spout
+     *   constructorArgs:
+     *   - [python, my_spout.py]
+     *   configMethods:
+     *   - name: setNamedStream
+     *     args:
+     *     - first
+     *     - [word, count]
+     * ```
+     * @param name Name of stream the spout will emit into.
+     * @param outputFields Names of fields the spout will emit in custom *named* stream.
+     */
+    public void setNamedStream(String name, String[] outputFields) {
+        this.outputFields.put(name, outputFields);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        Iterator it = this.outputFields.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry entryTuple = (Map.Entry)it.next();
+            String key = (String)entryTuple.getKey();
+            String[] value = (String[])entryTuple.getValue();
+            if(key.equals("default")) {
+                declarer.declare(new Fields(value));
+            } else {
+                declarer.declareStream(key, new Fields(value));
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return this.componentConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/resources/resources/randomsentence.js b/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
new file mode 100644
index 0000000..b121915
--- /dev/null
+++ b/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Example for storm spout. Emits random sentences.
+ * The original class in java - org.apache.storm.starter.spout.RandomSentenceSpout.
+ *
+ */
+
+var storm = require('./storm');
+var Spout = storm.Spout;
+
+
+var SENTENCES = [
+    "the cow jumped over the moon",
+    "an apple a day keeps the doctor away",
+    "four score and seven years ago",
+    "snow white and the seven dwarfs",
+    "i am at two with nature"]
+
+function RandomSentenceSpout(sentences) {
+    Spout.call(this);
+    this.runningTupleId = 0;
+    this.sentences = sentences;
+    this.pending = {};
+};
+
+RandomSentenceSpout.prototype = Object.create(Spout.prototype);
+RandomSentenceSpout.prototype.constructor = RandomSentenceSpout;
+
+RandomSentenceSpout.prototype.getRandomSentence = function() {
+    return this.sentences[getRandomInt(0, this.sentences.length - 1)];
+}
+
+RandomSentenceSpout.prototype.nextTuple = function(done) {
+    var self = this;
+    var sentence = this.getRandomSentence();
+    var tup = [sentence];
+    var id = this.createNextTupleId();
+    this.pending[id] = tup;
+    //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100
+    setTimeout(function() {
+        self.emit({tuple: tup, id: id}, function(taskIds) {
+            self.log(tup + ' sent to task ids - ' + taskIds);
+        });
+        done();
+    },100);
+}
+
+RandomSentenceSpout.prototype.createNextTupleId = function() {
+    var id = this.runningTupleId;
+    this.runningTupleId++;
+    return id;
+}
+
+RandomSentenceSpout.prototype.ack = function(id, done) {
+    this.log('Received ack for - ' + id);
+    delete this.pending[id];
+    done();
+}
+
+RandomSentenceSpout.prototype.fail = function(id, done) {
+    var self = this;
+    this.log('Received fail for - ' + id + '. Retrying.');
+    this.emit({tuple: this.pending[id], id:id}, function(taskIds) {
+        self.log(self.pending[id] + ' sent to task ids - ' + taskIds);
+    });
+    done();
+}
+
+/**
+ * Returns a random integer between min (inclusive) and max (inclusive)
+ */
+function getRandomInt(min, max) {
+    return Math.floor(Math.random() * (max - min + 1)) + min;
+}
+
+new RandomSentenceSpout(SENTENCES).run();

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/resources/resources/splitsentence.py b/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
new file mode 100644
index 0000000..300105f
--- /dev/null
+++ b/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import storm
+
+class SplitSentenceBolt(storm.BasicBolt):
+    def process(self, tup):
+        words = tup.values[0].split(" ")
+        for word in words:
+          storm.emit([word])
+
+SplitSentenceBolt().run()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/pom.xml
----------------------------------------------------------------------
diff --git a/flux/pom.xml b/flux/pom.xml
new file mode 100644
index 0000000..287b04a
--- /dev/null
+++ b/flux/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flux</artifactId>
+    <packaging>pom</packaging>
+    <name>flux</name>
+
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <developers>
+        <developer>
+            <id>ptgoetz</id>
+            <name>P. Taylor Goetz</name>
+            <email>ptgoetz@apache.org</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!-- see comment below... This fixes an annoyance with intellij -->
+        <provided.scope>provided</provided.scope>
+    </properties>
+
+    <profiles>
+        <!--
+            Hack to make intellij behave.
+            If you use intellij, enable this profile in your IDE.
+            It should make life easier.
+        -->
+        <profile>
+            <id>intellij</id>
+            <properties>
+                <provided.scope>compile</provided.scope>
+            </properties>
+        </profile>
+    </profiles>
+
+    <modules>
+        <module>flux-wrappers</module>
+        <module>flux-core</module>
+        <module>flux-examples</module>
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 896d735..d3f94e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,6 +320,9 @@
         <module>storm-drpc-server</module>
         <module>storm-rename-hack</module>
         <module>storm-clojure</module>
+        <module>storm-submit-tools</module>
+        <module>flux</module>
+        <module>sql</module>
 
         <!-- externals -->
         <module>external/storm-kafka</module>
@@ -329,8 +332,6 @@
         <module>external/storm-jdbc</module>
         <module>external/storm-redis</module>
         <module>external/storm-eventhubs</module>
-        <module>external/flux</module>
-        <module>external/sql</module>
         <module>external/storm-elasticsearch</module>
         <module>external/storm-solr</module>
         <module>external/storm-metrics</module>
@@ -342,7 +343,6 @@
         <module>external/storm-kafka-monitor</module>
         <module>external/storm-kinesis</module>
         <module>external/storm-druid</module>
-        <module>external/storm-submit-tools</module>
         <module>external/storm-jms</module>
         <module>external/storm-pmml</module>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/README.md
----------------------------------------------------------------------
diff --git a/sql/README.md b/sql/README.md
new file mode 100644
index 0000000..a4b44fb
--- /dev/null
+++ b/sql/README.md
@@ -0,0 +1,207 @@
+# Storm SQL
+
+Compile SQL queries to Storm topologies.
+
+## Usage
+
+Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster
+
+```
+$ bin/storm sql <sql-file> <topo-name>
+```
+
+In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology.
+
+StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`.
+Detailed explanation is available from `Showing Query Plan (explain mode)` section.
+
+## Supported Features
+
+The following features are supported in the current repository:
+
+* Streaming from and to external data sources
+* Filtering tuples
+* Projections
+* Aggregations (Grouping)
+* User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
+
+## Specifying External Data Sources
+
+In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE`
+statement. For example, the following statement specifies a Kafka spouts and sink:
+
+```
+CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+```
+
+The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
+[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
+
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
+Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
+
+## Plugging in External Data Sources
+
+Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using
+the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the
+tables. Please refer to the implementation of `storm-sql-kafka` for more details.
+
+## Specifying User Defined Function (UDF)
+
+Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement.
+For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class.
+
+```
+CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
+```
+
+Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined.
+If the class defines `evaluate` method, Storm SQL treats the function as `scalar`,
+and if the class defines `add` method, Storm SQL treats the function as `aggregate`.
+
+Example of class for scalar function is here:
+
+```
+  public class MyPlus {
+    public static Integer evaluate(Integer x, Integer y) {
+      return x + y;
+    }
+  }
+
+```
+
+and class for aggregate function is here:
+
+```
+  public class MyConcat {
+    public static String init() {
+      return "";
+    }
+    public static String add(String accumulator, String val) {
+      return accumulator + val;
+    }
+    public static String result(String accumulator) {
+      return accumulator;
+    }
+  }
+```
+
+If users don't define `result` method, result is the last return value of `add` method.
+Users need to define `result` method only when we need to transform accumulated value.
+
+## Example: Filtering Kafka Stream
+
+Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id
+of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the
+transactions are significant and to insert these orders into another Kafka stream for further analysis.
+
+The user can specify the following SQL statements in the SQL file:
+
+```
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
+```
+
+The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the
+ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`).
+The `TBLPROPERTIES` clause specifies the configuration of
+[KafkaProducer](http://kafka.apache.org/documentation.html#newproducerconfigs).
+Current implementation of `storm-sql-kafka` requires specifying both `LOCATION` and `TBLPROPERTIES` clauses even though
+the table is read-only or write-only.
+
+Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The third
+statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external
+table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by
+`LARGE_ORDER`.
+
+To run this example, users need to include the data sources (`storm-sql-kafka` in this case) and its dependency in the
+class path. Dependencies for Storm SQL are automatically handled when users run `storm sql`. Users can include data sources at the submission step like below:
+
+```
+$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+Above command submits the SQL statements to StormSQL. Users need to modify each artifacts' version if users are using different version of Storm or Kafka. 
+
+By now you should be able to see the `order_filtering` topology in the Storm UI.
+
+## Showing Query Plan (explain mode)
+
+Like `explain` on SQL statement, StormSQL provides `explain mode` when running Storm SQL Runner. In explain mode, StormSQL analyzes each query statement (only DML) and show plan instead of submitting topology.
+
+In order to run `explain mode`, you need to provide topology name as `--explain` and run `storm sql` as same as submitting.
+
+For example, when you run the example seen above with explain mode:
+ 
+```
+$ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+StormSQL prints out like below:
+ 
+```
+
+===========================================================
+query>
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+-----------------------------------------------------------
+16:53:43.951 [main] INFO  o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@4d1bf319
+No plan presented on DDL
+===========================================================
+===========================================================
+query>
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+-----------------------------------------------------------
+No plan presented on DDL
+===========================================================
+===========================================================
+query>
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
+-----------------------------------------------------------
+plan>
+LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
+  LogicalProject(ID=[$0], TOTAL=[*($1, $2)]), id = 7
+    LogicalFilter(condition=[>(*($1, $2), 50)]), id = 6
+      EnumerableTableScan(table=[[ORDERS]]), id = 5
+
+===========================================================
+
+```
+
+## Current Limitations
+
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+  - Not across batches.
+  - Limitation came from `join` feature of Trident.
+  - Please refer this doc: `Trident API Overview` for details.
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
new file mode 100644
index 0000000..f085dd8
--- /dev/null
+++ b/sql/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>sql</artifactId>
+    <packaging>pom</packaging>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <modules>
+        <module>storm-sql-core</module>
+        <module>storm-sql-runtime</module>
+        <module>storm-sql-external/storm-sql-kafka</module>
+        <module>storm-sql-external/storm-sql-redis</module>
+        <module>storm-sql-external/storm-sql-mongodb</module>
+        <module>storm-sql-external/storm-sql-hdfs</module>
+    </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/pom.xml b/sql/storm-sql-core/pom.xml
new file mode 100644
index 0000000..ca84970
--- /dev/null
+++ b/sql/storm-sql-core/pom.xml
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-core</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <plugins>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-fmpp-resources</id>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/codegen</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>src/codegen</directory>
+                                    <filtering>false</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-java-sources</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${basedir}/target/classes/</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>src/jvm</directory>
+                                    <filtering>true</filtering>
+                                </resource>
+                                <resource>
+                                    <directory>src/test</directory>
+                                    <filtering>true</filtering>
+                                </resource>
+                                <resource>
+                                    <directory>target/generated-sources</directory>
+                                    <!-- <include>*/org</include> -->
+                                    <filtering>true</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <!-- Extract parser grammar template from calcite-core.jar and put
+                     it under ${project.build.directory} where all freemarker templates are. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>unpack-parser-template</id>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.calcite</groupId>
+                                    <artifactId>calcite-core</artifactId>
+                                    <type>jar</type>
+                                    <overWrite>true</overWrite>
+                                    <outputDirectory>${project.build.directory}/</outputDirectory>
+                                    <includes>**/Parser.jj</includes>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- using appassembler-maven-plugin instead of maven-dependency-plugin to copy dependencies
+            as copy and unpack goal are not working together -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>appassembler-maven-plugin</artifactId>
+                <version>1.9</version>
+                <executions>
+                    <execution>
+                        <id>create-repo</id>
+                        <goals>
+                            <goal>create-repository</goal>
+                        </goals>
+                        <configuration>
+                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+                            <repositoryLayout>flat</repositoryLayout>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.googlecode.fmpp-maven-plugin</groupId>
+                <artifactId>fmpp-maven-plugin</artifactId>
+                <version>1.0</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.freemarker</groupId>
+                        <artifactId>freemarker</artifactId>
+                        <version>2.3.25-incubating</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>generate-fmpp-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
+                            <outputDirectory>target/generated-sources</outputDirectory>
+                            <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.5</version>
+                <executions>
+                    <execution>
+                        <id>add-generated-sources</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <id>javacc</id>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
+                            <includes>
+                                <include>**/Parser.jj</include>
+                            </includes>
+                            <lookAhead>2</lookAhead>
+                            <isStatic>false</isStatic>
+                            <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/config.fmpp b/sql/storm-sql-core/src/codegen/config.fmpp
new file mode 100644
index 0000000..be5a792
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/config.fmpp
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+data: {
+  parser:                   tdd(../data/Parser.tdd)
+}
+
+freemarkerLinks: {
+  includes: includes/
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/data/Parser.tdd b/sql/storm-sql-core/src/codegen/data/Parser.tdd
new file mode 100644
index 0000000..b0dccb6
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  # Generated parser implementation class package and name
+  package: "org.apache.storm.sql.parser.impl",
+  class: "StormParserImpl",
+
+  # List of import statements.
+  imports: [
+    "org.apache.calcite.sql.validate.*",
+    "org.apache.calcite.util.*",
+    "org.apache.storm.sql.parser.*",
+    "java.util.*"
+  ]
+
+  # List of keywords.
+  keywords: [
+    "LOCATION",
+    "INPUTFORMAT",
+    "OUTPUTFORMAT",
+    "PARALLELISM",
+    "STORED",
+    "TBLPROPERTIES",
+    "JAR"
+  ]
+
+  # List of methods for parsing custom SQL statements.
+  statementParserMethods: [
+    "SqlCreateTable()",
+    "SqlCreateFunction()"
+  ]
+
+  # List of methods for parsing custom literals.
+  # Example: ParseJsonLiteral().
+  literalParserMethods: [
+  ]
+
+  # List of methods for parsing custom data types.
+  dataTypeParserMethods: [
+  ]
+
+  nonReservedKeywords: [
+  ]
+
+  createStatementParserMethods: [
+  ]
+
+  alterStatementParserMethods: [
+  ]
+
+  dropStatementParserMethods: [
+  ]
+
+  # List of files in @includes directory that have parser method
+  # implementations for custom SQL statements, literals or types
+  # given as part of "statementParserMethods", "literalParserMethods" or
+  # "dataTypeParserMethods".
+  implementationFiles: [
+    "parserImpls.ftl"
+  ]
+
+  includeCompoundIdentifier: true,
+  includeBraces: true,
+  includeAdditionalDeclarations: false,
+  allowBangEqual: false
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/includes/license.ftl
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/includes/license.ftl b/sql/storm-sql-core/src/codegen/includes/license.ftl
new file mode 100644
index 0000000..7e66353
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/includes/license.ftl
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
new file mode 100644
index 0000000..4143840
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -0,0 +1,113 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+
+private void ColumnDef(List<ColumnDefinition> list) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    ColumnConstraint constraint = null;
+    SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
+}
+{
+    name = SimpleIdentifier() { pos = getPos(); }
+    type = DataType()
+    [
+      <PRIMARY> <KEY>
+      [ <ASC>   { monotonicity = SqlMonotonicity.INCREASING; }
+      | <DESC>  { monotonicity = SqlMonotonicity.DECREASING; }
+      ]
+      { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
+    ]
+    {
+        list.add(new ColumnDefinition(name, type, constraint, pos));
+    }
+}
+
+SqlNodeList ColumnDefinitionList() :
+{
+    SqlParserPos pos;
+    List<ColumnDefinition> list = Lists.newArrayList();
+}
+{
+    <LPAREN> { pos = getPos(); }
+    ColumnDef(list)
+    ( <COMMA> ColumnDef(list) )*
+    <RPAREN> {
+        return new SqlNodeList(list, pos.plus(getPos()));
+    }
+}
+
+/**
+ * CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
+ *   ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
+ *   ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )?
+ *   LOCATION location_uri
+ *   ( TBLPROPERTIES tbl_properties )?
+ *   ( AS select_stmt )
+ */
+SqlNode SqlCreateTable() :
+{
+    SqlParserPos pos;
+    SqlIdentifier tblName;
+    SqlNodeList fieldList;
+    SqlNode location;
+    SqlNode parallelism = null;
+    SqlNode input_format_class_name = null, output_format_class_name = null;
+    SqlNode tbl_properties = null;
+    SqlNode select = null;
+}
+{
+    <CREATE> { pos = getPos(); }
+    <EXTERNAL> <TABLE>
+    tblName = CompoundIdentifier()
+    fieldList = ColumnDefinitionList()
+    [
+      <STORED> <AS>
+      <INPUTFORMAT> input_format_class_name = StringLiteral()
+      <OUTPUTFORMAT> output_format_class_name = StringLiteral()
+    ]
+    <LOCATION>
+    location = StringLiteral()
+    [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
+    [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
+    [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
+        return new SqlCreateTable(pos, tblName, fieldList,
+        input_format_class_name, output_format_class_name, location,
+        parallelism, tbl_properties, select);
+    }
+}
+
+/**
+ * CREATE FUNCTION functionname AS 'classname'
+ */
+SqlNode SqlCreateFunction() :
+{
+    SqlParserPos pos;
+    SqlIdentifier functionName;
+    SqlNode className;
+    SqlNode jarName = null;
+}
+{
+    <CREATE> { pos = getPos(); }
+    <FUNCTION>
+        functionName = CompoundIdentifier()
+    <AS>
+        className = StringLiteral()
+    [
+      <USING> <JAR>
+      jarName = StringLiteral()
+    ]
+    {
+      return new SqlCreateFunction(pos, functionName, className, jarName);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
new file mode 100644
index 0000000..843339f
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.DataContext;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+
+import java.util.List;
+
+public abstract class AbstractTridentProcessor {
+  protected Stream outputStream;
+  protected DataContext dataContext;
+  protected List<CompilingClassLoader> classLoaders;
+  /**
+   * @return the output stream of the SQL
+   */
+  public Stream outputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Construct the trident topology based on the SQL.
+   */
+  public abstract TridentTopology build();
+
+  /**
+   * @return DataContext instance which is used with execution of query
+   */
+  public DataContext getDataContext() {
+    return dataContext;
+  }
+
+  /**
+   * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
+   */
+  public List<CompilingClassLoader> getClassLoaders() {
+    return classLoaders;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
new file mode 100644
index 0000000..5dec4af
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.sql.runtime.ChannelHandler;
+
+import java.util.Map;
+
+/**
+ * The StormSql class provides standalone, interactive interfaces to execute
+ * SQL statements over streaming data.
+ * <p>
+ * The StormSql class is stateless. The user needs to submit the data
+ * definition language (DDL) statements and the query statements in the same
+ * batch.
+ */
+public abstract class StormSql {
+  /**
+   * Execute the SQL statements in stand-alone mode. The user can retrieve the result by passing in an instance
+   * of {@see ChannelHandler}.
+   */
+  public abstract void execute(Iterable<String> statements,
+                               ChannelHandler handler) throws Exception;
+
+  /**
+   * Submit the SQL statements to Nimbus and run it as a topology.
+   */
+  public abstract void submit(
+      String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
+      StormSubmitter.ProgressListener progressListener, String asUser)
+      throws Exception;
+
+  /**
+   * Print out query plan for each query.
+   */
+  public abstract void explain(Iterable<String> statements) throws Exception;
+
+  public static StormSql construct() {
+    return new StormSqlImpl();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
new file mode 100644
index 0000000..007daa7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateFunction;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.TridentTopology;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.Attributes;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+
+import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
+
+class StormSqlImpl extends StormSql {
+  private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  private final SchemaPlus schema = Frameworks.createRootSchema(true);
+  private boolean hasUdf = false;
+
+  @Override
+  public void execute(
+      Iterable<String> statements, ChannelHandler result)
+      throws Exception {
+    Map<String, DataSource> dataSources = new HashMap<>();
+    for (String sql : statements) {
+      StormParser parser = new StormParser(sql);
+      SqlNode node = parser.impl().parseSqlStmtEof();
+      if (node instanceof SqlCreateTable) {
+        handleCreateTable((SqlCreateTable) node, dataSources);
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
+      } else {
+        FrameworkConfig config = buildFrameWorkConfig();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        PlanCompiler compiler = new PlanCompiler(typeFactory);
+        AbstractValuesProcessor proc = compiler.compile(tree);
+        proc.initialize(dataSources, result);
+      }
+    }
+  }
+
+  @Override
+  public void submit(
+      String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
+      StormSubmitter.ProgressListener progressListener, String asUser)
+      throws Exception {
+    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+    for (String sql : statements) {
+      StormParser parser = new StormParser(sql);
+      SqlNode node = parser.impl().parseSqlStmtEof();
+      if (node instanceof SqlCreateTable) {
+        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
+      }  else {
+        QueryPlanner planner = new QueryPlanner(schema);
+        AbstractTridentProcessor processor = planner.compile(dataSources, sql);
+        TridentTopology topo = processor.build();
+
+        Path jarPath = null;
+        try {
+          // QueryPlanner on Trident mode configures the topology with compiled classes,
+          // so we need to add new classes into topology jar
+          // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
+
+          jarPath = Files.createTempFile("storm-sql", ".jar");
+          System.setProperty("storm.jar", jarPath.toString());
+          packageTopology(jarPath, processor);
+          StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
+        } finally {
+          if (jarPath != null) {
+            Files.delete(jarPath);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void explain(Iterable<String> statements) throws Exception {
+    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+    for (String sql : statements) {
+      StormParser parser = new StormParser(sql);
+      SqlNode node = parser.impl().parseSqlStmtEof();
+
+      System.out.println("===========================================================");
+      System.out.println("query>");
+      System.out.println(sql);
+      System.out.println("-----------------------------------------------------------");
+
+      if (node instanceof SqlCreateTable) {
+        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+        System.out.println("No plan presented on DDL");
+      } else if (node instanceof SqlCreateFunction) {
+        handleCreateFunction((SqlCreateFunction) node);
+        System.out.println("No plan presented on DDL");
+      } else {
+        FrameworkConfig config = buildFrameWorkConfig();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+
+        String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
+        System.out.println("plan>");
+        System.out.println(plan);
+      }
+
+      System.out.println("===========================================================");
+    }
+  }
+
+  private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
+    Manifest manifest = new Manifest();
+    Attributes attr = manifest.getMainAttributes();
+    attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
+    attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
+    try (JarOutputStream out = new JarOutputStream(
+            new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
+      List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
+      if (classLoaders != null && !classLoaders.isEmpty()) {
+        for (CompilingClassLoader classLoader : classLoaders) {
+          for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
+            out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
+            out.write(e.getValue().toByteArray());
+            out.closeEntry();
+          }
+        }
+      }
+    }
+  }
+
+  private void handleCreateTable(
+      SqlCreateTable n, Map<String, DataSource> dataSources) {
+    List<FieldInfo> fields = updateSchema(n);
+    DataSource ds = DataSourcesRegistry.construct(n.location(), n
+        .inputFormatClass(), n.outputFormatClass(), fields);
+    if (ds == null) {
+      throw new RuntimeException("Cannot construct data source for " + n
+          .tableName());
+    } else if (dataSources.containsKey(n.tableName())) {
+      throw new RuntimeException("Duplicated definition for table " + n
+          .tableName());
+    }
+    dataSources.put(n.tableName(), ds);
+  }
+
+  private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException {
+    if(sqlCreateFunction.jarName() != null) {
+      throw new UnsupportedOperationException("UDF 'USING JAR' not implemented");
+    }
+    Method method;
+    Function function;
+    if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) {
+      function = ScalarFunctionImpl.create(method);
+    } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
+      function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
+    } else {
+      throw new RuntimeException("Invalid scalar or aggregate function");
+    }
+    schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
+    hasUdf = true;
+  }
+
+  private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException {
+    Class<?> clazz = Class.forName(clazzName);
+    for (Method method : clazz.getMethods()) {
+      if (method.getName().equals(methodName)) {
+        return method;
+      }
+    }
+    return null;
+  }
+
+  private void handleCreateTableForTrident(
+      SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
+    List<FieldInfo> fields = updateSchema(n);
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
+        .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
+    if (ds == null) {
+      throw new RuntimeException("Failed to find data source for " + n
+          .tableName() + " URI: " + n.location());
+    } else if (dataSources.containsKey(n.tableName())) {
+      throw new RuntimeException("Duplicated definition for table " + n
+          .tableName());
+    }
+    dataSources.put(n.tableName(), ds);
+  }
+
+  private List<FieldInfo> updateSchema(SqlCreateTable n) {
+    TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
+    List<FieldInfo> fields = new ArrayList<>();
+    for (ColumnDefinition col : n.fieldList()) {
+      builder.field(col.name(), col.type(), col.constraint());
+      RelDataType dataType = col.type().deriveType(typeFactory);
+      Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+      ColumnConstraint constraint = col.constraint();
+      boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
+      fields.add(new FieldInfo(col.name(), javaType, isPrimary));
+    }
+
+    if (n.parallelism() != null) {
+      builder.parallelismHint(n.parallelism());
+    }
+    Table table = builder.build();
+    schema.add(n.tableName(), table);
+    return fields;
+  }
+
+  private FrameworkConfig buildFrameWorkConfig() {
+    if (hasUdf) {
+      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+      sqlOperatorTables.add(SqlStdOperatorTable.instance());
+      sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                                                     false,
+                                                     Collections.<String>emptyList(), typeFactory));
+      return Frameworks.newConfigBuilder().defaultSchema(schema)
+              .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
+    } else {
+      return Frameworks.newConfigBuilder().defaultSchema(schema).build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
new file mode 100644
index 0000000..5618647
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.utils.Utils;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+public class StormSqlRunner {
+    private static final String OPTION_SQL_FILE_SHORT = "f";
+    private static final String OPTION_SQL_FILE_LONG = "file";
+    private static final String OPTION_SQL_TOPOLOGY_NAME_SHORT = "t";
+    private static final String OPTION_SQL_TOPOLOGY_NAME_LONG = "topology";
+    private static final String OPTION_SQL_EXPLAIN_SHORT = "e";
+    private static final String OPTION_SQL_EXPLAIN_LONG = "explain";
+
+    public static void main(String[] args) throws Exception {
+        Options options = buildOptions();
+        CommandLineParser parser = new DefaultParser();
+        CommandLine commandLine = parser.parse(options, args);
+
+        if (!commandLine.hasOption(OPTION_SQL_FILE_LONG)) {
+            printUsageAndExit(options, OPTION_SQL_FILE_LONG + " is required");
+        }
+
+        String filePath = commandLine.getOptionValue(OPTION_SQL_FILE_LONG);
+        List<String> stmts = Files.readAllLines(Paths.get(filePath), StandardCharsets.UTF_8);
+        StormSql sql = StormSql.construct();
+        @SuppressWarnings("unchecked")
+        Map<String, ?> conf = Utils.readStormConfig();
+
+        if (commandLine.hasOption(OPTION_SQL_EXPLAIN_LONG)) {
+            sql.explain(stmts);
+        } else if (commandLine.hasOption(OPTION_SQL_TOPOLOGY_NAME_LONG)) {
+            String topoName = commandLine.getOptionValue(OPTION_SQL_TOPOLOGY_NAME_LONG);
+            SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+            sql.submit(topoName, stmts, conf, submitOptions, null, null);
+        } else {
+            printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG +
+                    " must be presented");
+        }
+    }
+
+    private static void printUsageAndExit(Options options, String message) {
+        System.out.println(message);
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("storm-sql-runner ", options);
+        System.exit(1);
+    }
+
+    private static Options buildOptions() {
+        Options options = new Options();
+        options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
+        options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
+        options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
+        return options;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
new file mode 100644
index 0000000..c6b584d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.schema.StreamableTable;
+
+/**
+ * Table that can be converted to a stream. This table also has its parallelism information.
+ *
+ * @see Delta
+ */
+public interface ParallelStreamableTable extends StreamableTable {
+
+    /**
+     * Returns parallelism hint of this table. Returns null if don't know.
+     */
+    Integer parallelismHint();
+}