You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:10 UTC
[07/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
new file mode 100644
index 0000000..05b8e7a
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.wrappers.bolts;
+
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A generic `ShellBolt` implementation that allows you specify output fields
+ * and even streams without having to subclass `ShellBolt` to do so.
+ *
+ */
+public class FluxShellBolt extends ShellBolt implements IRichBolt{
+ private Map<String, String[]> outputFields;
+ private Map<String, Object> componentConfig;
+
+ /**
+ * Create a ShellBolt with command line arguments
+ * @param command Command line arguments for the bolt
+ */
+ public FluxShellBolt(String[] command){
+ super(command);
+ this.outputFields = new HashMap<String, String[]>();
+ }
+
+ /**
+ * Create a ShellBolt with command line arguments and output fields
+ *
+ * Keep this constructor for backward compatibility.
+ *
+ * @param command Command line arguments for the bolt
+ * @param outputFields Names of fields the bolt will emit (if any).
+ */
+ public FluxShellBolt(String[] command, String[] outputFields){
+ this(command);
+ this.setDefaultStream(outputFields);
+ }
+
+ /**
+ * Add configuration for this bolt. This method is called from YAML file:
+ *
+ * ```
+ * className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ * constructorArgs:
+ * # command line
+ * - ["python", "splitsentence.py"]
+ * # output fields
+ * - ["word"]
+ * configMethods:
+ * - name: "addComponentConfig"
+ * args: ["publisher.data_paths", "actions"]
+ * ```
+ *
+ * @param key
+ * @param value
+ */
+ public void addComponentConfig(String key, Object value) {
+ if (this.componentConfig == null) {
+ this.componentConfig = new HashMap<String, Object>();
+ }
+ this.componentConfig.put(key, value);
+ }
+
+ /**
+ * Add configuration for this bolt. This method is called from YAML file:
+ *
+ * ```
+ * className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ * constructorArgs:
+ * # command line
+ * - ["python", "splitsentence.py"]
+ * # output fields
+ * - ["word"]
+ * configMethods:
+ * - name: "addComponentConfig"
+ * args:
+ * - "publisher.data_paths"
+ * - ["actions"]
+ * ```
+ *
+ * @param key
+ * @param values
+ */
+ public void addComponentConfig(String key, List<Object> values) {
+ if (this.componentConfig == null) {
+ this.componentConfig = new HashMap<String, Object>();
+ }
+ this.componentConfig.put(key, values);
+ }
+
+ /**
+ * Set default stream outputFields, this method is called from YAML file:
+ *
+ * ```
+ * bolts:
+ * - className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
+ * id: my_bolt
+ * constructorArgs:
+ * - [python, my_bolt.py]
+ * configMethods:
+ * - name: setDefaultStream
+ * args:
+ * - [word, count]
+ * ```
+ *
+ * @param outputFields Names of fields the bolt will emit (if any) in default stream.
+ */
+ public void setDefaultStream(String[] outputFields) {
+ this.setNamedStream("default", outputFields);
+ }
+
+ /**
+ * Set custom *named* stream outputFields, this method is called from YAML file:
+ *
+ * ```
+ * bolts:
+ * - className: org.apache.storm.flux.wrappers.bolts.FluxShellBolt
+ * id: my_bolt
+ * constructorArgs:
+ * - [python, my_bolt.py]
+ * configMethods:
+ * - name: setNamedStream
+ * args:
+ * - first
+ * - [word, count]
+ * ```
+ * @param name Name of stream the bolt will emit into.
+ * @param outputFields Names of fields the bolt will emit in custom *named* stream.
+ */
+ public void setNamedStream(String name, String[] outputFields) {
+ this.outputFields.put(name, outputFields);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Iterator it = this.outputFields.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry entryTuple = (Map.Entry)it.next();
+ String key = (String)entryTuple.getKey();
+ String[] value = (String[])entryTuple.getValue();
+ if(key.equals("default")) {
+ declarer.declare(new Fields(value));
+ } else {
+ declarer.declareStream(key, new Fields(value));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return this.componentConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
new file mode 100644
index 0000000..5f0e84b
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.flux.wrappers.bolts;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple bolt that does nothing other than LOG.info() every tuple recieveed.
+ *
+ */
+public class LogInfoBolt extends BaseBasicBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(LogInfoBolt.class);
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ LOG.info("{}", tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
new file mode 100644
index 0000000..5fd378d
--- /dev/null
+++ b/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.wrappers.spouts;
+
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * A generic `ShellSpout` implementation that allows you specify output fields
+ * and even streams without having to subclass `ShellSpout` to do so.
+ *
+ */
+public class FluxShellSpout extends ShellSpout implements IRichSpout {
+ private Map<String, String[]> outputFields;
+ private Map<String, Object> componentConfig;
+
+ /**
+ * Create a ShellSpout with command line arguments
+ * @param command Command line arguments for the bolt
+ */
+ public FluxShellSpout(String[] command){
+ super(command);
+ this.outputFields = new HashMap<String, String[]>();
+ }
+
+ /**
+ * Create a ShellSpout with command line arguments and output fields
+ *
+ * Keep this constructor for backward compatibility.
+ *
+ * @param args Command line arguments for the spout
+ * @param outputFields Names of fields the spout will emit.
+ */
+ public FluxShellSpout(String[] args, String[] outputFields){
+ this(args);
+ this.setDefaultStream(outputFields);
+ }
+
+ /**
+ * Add configuration for this spout. This method is called from YAML file:
+ *
+ * ```
+ * className: "org.apache.storm.flux.wrappers.bolts.FluxShellSpout"
+ * constructorArgs:
+ * # command line
+ * - ["python", "splitsentence.py"]
+ * # output fields
+ * - ["word"]
+ * configMethods:
+ * - name: "addComponentConfig"
+ * args: ["publisher.data_paths", "actions"]
+ * ```
+ *
+ * @param key
+ * @param value
+ */
+ public void addComponentConfig(String key, Object value) {
+ if (this.componentConfig == null) {
+ this.componentConfig = new HashMap<String, Object>();
+ }
+ this.componentConfig.put(key, value);
+ }
+
+ /**
+ * Add configuration for this spout. This method is called from YAML file:
+ *
+ * ```
+ * className: "org.apache.storm.flux.wrappers.bolts.FluxShellSpout"
+ * constructorArgs:
+ * # command line
+ * - ["python", "splitsentence.py"]
+ * # output fields
+ * - ["word"]
+ * configMethods:
+ * - name: "addComponentConfig"
+ * args:
+ * - "publisher.data_paths"
+ * - ["actions"]
+ * ```
+ *
+ * @param key
+ * @param values
+ */
+ public void addComponentConfig(String key, List<Object> values) {
+ if (this.componentConfig == null) {
+ this.componentConfig = new HashMap<String, Object>();
+ }
+ this.componentConfig.put(key, values);
+ }
+
+ /**
+ * Set default stream outputFields, this method is called from YAML file:
+ *
+ * ```
+ * spouts:
+ * - className: org.apache.storm.flux.wrappers.bolts.FluxShellSpout
+ * id: my_spout
+ * constructorArgs:
+ * - [python, my_spout.py]
+ * configMethods:
+ * - name: setDefaultStream
+ * args:
+ * - [word, count]
+ * ```
+ *
+ * @param outputFields Names of fields the spout will emit (if any) in default stream.
+ */
+ public void setDefaultStream(String[] outputFields) {
+ this.setNamedStream("default", outputFields);
+ }
+
+ /**
+ * Set custom *named* stream outputFields, this method is called from YAML file:
+ *
+ * ```
+ * spouts:
+ * - className: org.apache.storm.flux.wrappers.bolts.FluxShellSpout
+ * id: my_spout
+ * constructorArgs:
+ * - [python, my_spout.py]
+ * configMethods:
+ * - name: setNamedStream
+ * args:
+ * - first
+ * - [word, count]
+ * ```
+ * @param name Name of stream the spout will emit into.
+ * @param outputFields Names of fields the spout will emit in custom *named* stream.
+ */
+ public void setNamedStream(String name, String[] outputFields) {
+ this.outputFields.put(name, outputFields);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Iterator it = this.outputFields.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry entryTuple = (Map.Entry)it.next();
+ String key = (String)entryTuple.getKey();
+ String[] value = (String[])entryTuple.getValue();
+ if(key.equals("default")) {
+ declarer.declare(new Fields(value));
+ } else {
+ declarer.declareStream(key, new Fields(value));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return this.componentConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/resources/resources/randomsentence.js b/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
new file mode 100644
index 0000000..b121915
--- /dev/null
+++ b/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Example for storm spout. Emits random sentences.
+ * The original class in java - org.apache.storm.starter.spout.RandomSentenceSpout.
+ *
+ */
+
+var storm = require('./storm');
+var Spout = storm.Spout;
+
+
+var SENTENCES = [
+ "the cow jumped over the moon",
+ "an apple a day keeps the doctor away",
+ "four score and seven years ago",
+ "snow white and the seven dwarfs",
+ "i am at two with nature"]
+
+function RandomSentenceSpout(sentences) {
+ Spout.call(this);
+ this.runningTupleId = 0;
+ this.sentences = sentences;
+ this.pending = {};
+};
+
+RandomSentenceSpout.prototype = Object.create(Spout.prototype);
+RandomSentenceSpout.prototype.constructor = RandomSentenceSpout;
+
+RandomSentenceSpout.prototype.getRandomSentence = function() {
+ return this.sentences[getRandomInt(0, this.sentences.length - 1)];
+}
+
+RandomSentenceSpout.prototype.nextTuple = function(done) {
+ var self = this;
+ var sentence = this.getRandomSentence();
+ var tup = [sentence];
+ var id = this.createNextTupleId();
+ this.pending[id] = tup;
+ //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100
+ setTimeout(function() {
+ self.emit({tuple: tup, id: id}, function(taskIds) {
+ self.log(tup + ' sent to task ids - ' + taskIds);
+ });
+ done();
+ },100);
+}
+
+RandomSentenceSpout.prototype.createNextTupleId = function() {
+ var id = this.runningTupleId;
+ this.runningTupleId++;
+ return id;
+}
+
+RandomSentenceSpout.prototype.ack = function(id, done) {
+ this.log('Received ack for - ' + id);
+ delete this.pending[id];
+ done();
+}
+
+RandomSentenceSpout.prototype.fail = function(id, done) {
+ var self = this;
+ this.log('Received fail for - ' + id + '. Retrying.');
+ this.emit({tuple: this.pending[id], id:id}, function(taskIds) {
+ self.log(self.pending[id] + ' sent to task ids - ' + taskIds);
+ });
+ done();
+}
+
+/**
+ * Returns a random integer between min (inclusive) and max (inclusive)
+ */
+function getRandomInt(min, max) {
+ return Math.floor(Math.random() * (max - min + 1)) + min;
+}
+
+new RandomSentenceSpout(SENTENCES).run();
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
----------------------------------------------------------------------
diff --git a/flux/flux-wrappers/src/main/resources/resources/splitsentence.py b/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
new file mode 100644
index 0000000..300105f
--- /dev/null
+++ b/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import storm
+
+class SplitSentenceBolt(storm.BasicBolt):
+ def process(self, tup):
+ words = tup.values[0].split(" ")
+ for word in words:
+ storm.emit([word])
+
+SplitSentenceBolt().run()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/pom.xml
----------------------------------------------------------------------
diff --git a/flux/pom.xml b/flux/pom.xml
new file mode 100644
index 0000000..287b04a
--- /dev/null
+++ b/flux/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flux</artifactId>
+ <packaging>pom</packaging>
+ <name>flux</name>
+
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <developers>
+ <developer>
+ <id>ptgoetz</id>
+ <name>P. Taylor Goetz</name>
+ <email>ptgoetz@apache.org</email>
+ </developer>
+ </developers>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- see comment below... This fixes an annoyance with intellij -->
+ <provided.scope>provided</provided.scope>
+ </properties>
+
+ <profiles>
+ <!--
+ Hack to make intellij behave.
+ If you use intellij, enable this profile in your IDE.
+ It should make life easier.
+ -->
+ <profile>
+ <id>intellij</id>
+ <properties>
+ <provided.scope>compile</provided.scope>
+ </properties>
+ </profile>
+ </profiles>
+
+ <modules>
+ <module>flux-wrappers</module>
+ <module>flux-core</module>
+ <module>flux-examples</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${storm.kafka.artifact.id}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 896d735..d3f94e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -320,6 +320,9 @@
<module>storm-drpc-server</module>
<module>storm-rename-hack</module>
<module>storm-clojure</module>
+ <module>storm-submit-tools</module>
+ <module>flux</module>
+ <module>sql</module>
<!-- externals -->
<module>external/storm-kafka</module>
@@ -329,8 +332,6 @@
<module>external/storm-jdbc</module>
<module>external/storm-redis</module>
<module>external/storm-eventhubs</module>
- <module>external/flux</module>
- <module>external/sql</module>
<module>external/storm-elasticsearch</module>
<module>external/storm-solr</module>
<module>external/storm-metrics</module>
@@ -342,7 +343,6 @@
<module>external/storm-kafka-monitor</module>
<module>external/storm-kinesis</module>
<module>external/storm-druid</module>
- <module>external/storm-submit-tools</module>
<module>external/storm-jms</module>
<module>external/storm-pmml</module>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/README.md
----------------------------------------------------------------------
diff --git a/sql/README.md b/sql/README.md
new file mode 100644
index 0000000..a4b44fb
--- /dev/null
+++ b/sql/README.md
@@ -0,0 +1,207 @@
+# Storm SQL
+
+Compile SQL queries to Storm topologies.
+
+## Usage
+
+Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster
+
+```
+$ bin/storm sql <sql-file> <topo-name>
+```
+
+In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology.
+
+StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`.
+Detailed explanation is available from `Showing Query Plan (explain mode)` section.
+
+## Supported Features
+
+The following features are supported in the current repository:
+
+* Streaming from and to external data sources
+* Filtering tuples
+* Projections
+* Aggregations (Grouping)
+* User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
+
+## Specifying External Data Sources
+
+In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE`
+statement. For example, the following statement specifies a Kafka spouts and sink:
+
+```
+CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+```
+
+The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
+[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
+
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
+Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
+
+## Plugging in External Data Sources
+
+Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using
+the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the
+tables. Please refer to the implementation of `storm-sql-kafka` for more details.
+
+## Specifying User Defined Function (UDF)
+
+Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement.
+For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class.
+
+```
+CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
+```
+
+Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined.
+If the class defines `evaluate` method, Storm SQL treats the function as `scalar`,
+and if the class defines `add` method, Storm SQL treats the function as `aggregate`.
+
+Example of class for scalar function is here:
+
+```
+ public class MyPlus {
+ public static Integer evaluate(Integer x, Integer y) {
+ return x + y;
+ }
+ }
+
+```
+
+and class for aggregate function is here:
+
+```
+ public class MyConcat {
+ public static String init() {
+ return "";
+ }
+ public static String add(String accumulator, String val) {
+ return accumulator + val;
+ }
+ public static String result(String accumulator) {
+ return accumulator;
+ }
+ }
+```
+
+If users don't define `result` method, result is the last return value of `add` method.
+Users need to define `result` method only when we need to transform accumulated value.
+
+## Example: Filtering Kafka Stream
+
+Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id
+of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the
+transactions are significant and to insert these orders into another Kafka stream for further analysis.
+
+The user can specify the following SQL statements in the SQL file:
+
+```
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
+```
+
+The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the
+ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`).
+The `TBLPROPERTIES` clause specifies the configuration of
+[KafkaProducer](http://kafka.apache.org/documentation.html#newproducerconfigs).
+Current implementation of `storm-sql-kafka` requires specifying both `LOCATION` and `TBLPROPERTIES` clauses even though
+the table is read-only or write-only.
+
+Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The third
+statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external
+table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by
+`LARGE_ORDER`.
+
+To run this example, users need to include the data sources (`storm-sql-kafka` in this case) and its dependency in the
+class path. Dependencies for Storm SQL are automatically handled when users run `storm sql`. Users can include data sources at the submission step like below:
+
+```
+$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+Above command submits the SQL statements to StormSQL. Users need to modify each artifacts' version if users are using different version of Storm or Kafka.
+
+By now you should be able to see the `order_filtering` topology in the Storm UI.
+
+## Showing Query Plan (explain mode)
+
+Like `explain` on SQL statement, StormSQL provides `explain mode` when running Storm SQL Runner. In explain mode, StormSQL analyzes each query statement (only DML) and show plan instead of submitting topology.
+
+In order to run `explain mode`, you need to provide topology name as `--explain` and run `storm sql` as same as submitting.
+
+For example, when you run the example seen above with explain mode:
+
+```
+$ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+```
+
+StormSQL prints out like below:
+
+```
+
+===========================================================
+query>
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+-----------------------------------------------------------
+16:53:43.951 [main] INFO o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@4d1bf319
+No plan presented on DDL
+===========================================================
+===========================================================
+query>
+CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
+-----------------------------------------------------------
+No plan presented on DDL
+===========================================================
+===========================================================
+query>
+INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
+-----------------------------------------------------------
+plan>
+LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
+ LogicalProject(ID=[$0], TOTAL=[*($1, $2)]), id = 7
+ LogicalFilter(condition=[>(*($1, $2), 50)]), id = 6
+ EnumerableTableScan(table=[[ORDERS]]), id = 5
+
+===========================================================
+
+```
+
+## Current Limitations
+
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+ - Not across batches.
+ - Limitation came from `join` feature of Trident.
+ - Please refer this doc: `Trident API Overview` for details.
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
new file mode 100644
index 0000000..f085dd8
--- /dev/null
+++ b/sql/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>sql</artifactId>
+ <packaging>pom</packaging>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <modules>
+ <module>storm-sql-core</module>
+ <module>storm-sql-runtime</module>
+ <module>storm-sql-external/storm-sql-kafka</module>
+ <module>storm-sql-external/storm-sql-redis</module>
+ <module>storm-sql-external/storm-sql-mongodb</module>
+ <module>storm-sql-external/storm-sql-hdfs</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/pom.xml b/sql/storm-sql-core/pom.xml
new file mode 100644
index 0000000..ca84970
--- /dev/null
+++ b/sql/storm-sql-core/pom.xml
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-core</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-fmpp-resources</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/codegen</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/codegen</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-java-sources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes/</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/jvm</directory>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>src/test</directory>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>target/generated-sources</directory>
+ <!-- <include>*/org</include> -->
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Extract parser grammar template from calcite-core.jar and put
+ it under ${project.build.directory} where all freemarker templates are. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>unpack-parser-template</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/</outputDirectory>
+ <includes>**/Parser.jj</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- using appassembler-maven-plugin instead of maven-dependency-plugin to copy dependencies
+ as copy and unpack goal are not working together -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>create-repo</id>
+ <goals>
+ <goal>create-repository</goal>
+ </goals>
+ <configuration>
+ <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+ <repositoryLayout>flat</repositoryLayout>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.googlecode.fmpp-maven-plugin</groupId>
+ <artifactId>fmpp-maven-plugin</artifactId>
+ <version>1.0</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.freemarker</groupId>
+ <artifactId>freemarker</artifactId>
+ <version>2.3.25-incubating</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>generate-fmpp-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
+ <outputDirectory>target/generated-sources</outputDirectory>
+ <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-generated-sources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <id>javacc</id>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
+ <includes>
+ <include>**/Parser.jj</include>
+ </includes>
+ <lookAhead>2</lookAhead>
+ <isStatic>false</isStatic>
+ <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/config.fmpp b/sql/storm-sql-core/src/codegen/config.fmpp
new file mode 100644
index 0000000..be5a792
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/config.fmpp
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+data: {
+ parser: tdd(../data/Parser.tdd)
+}
+
+freemarkerLinks: {
+ includes: includes/
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/data/Parser.tdd b/sql/storm-sql-core/src/codegen/data/Parser.tdd
new file mode 100644
index 0000000..b0dccb6
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ # Generated parser implementation class package and name
+ package: "org.apache.storm.sql.parser.impl",
+ class: "StormParserImpl",
+
+ # List of import statements.
+ imports: [
+ "org.apache.calcite.sql.validate.*",
+ "org.apache.calcite.util.*",
+ "org.apache.storm.sql.parser.*",
+ "java.util.*"
+ ]
+
+ # List of keywords.
+ keywords: [
+ "LOCATION",
+ "INPUTFORMAT",
+ "OUTPUTFORMAT",
+ "PARALLELISM",
+ "STORED",
+ "TBLPROPERTIES",
+ "JAR"
+ ]
+
+ # List of methods for parsing custom SQL statements.
+ statementParserMethods: [
+ "SqlCreateTable()",
+ "SqlCreateFunction()"
+ ]
+
+ # List of methods for parsing custom literals.
+ # Example: ParseJsonLiteral().
+ literalParserMethods: [
+ ]
+
+ # List of methods for parsing custom data types.
+ dataTypeParserMethods: [
+ ]
+
+ nonReservedKeywords: [
+ ]
+
+ createStatementParserMethods: [
+ ]
+
+ alterStatementParserMethods: [
+ ]
+
+ dropStatementParserMethods: [
+ ]
+
+ # List of files in @includes directory that have parser method
+ # implementations for custom SQL statements, literals or types
+ # given as part of "statementParserMethods", "literalParserMethods" or
+ # "dataTypeParserMethods".
+ implementationFiles: [
+ "parserImpls.ftl"
+ ]
+
+ includeCompoundIdentifier: true,
+ includeBraces: true,
+ includeAdditionalDeclarations: false,
+ allowBangEqual: false
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/includes/license.ftl
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/includes/license.ftl b/sql/storm-sql-core/src/codegen/includes/license.ftl
new file mode 100644
index 0000000..7e66353
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/includes/license.ftl
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
new file mode 100644
index 0000000..4143840
--- /dev/null
+++ b/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -0,0 +1,113 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+
+private void ColumnDef(List<ColumnDefinition> list) :
+{
+ SqlParserPos pos;
+ SqlIdentifier name;
+ SqlDataTypeSpec type;
+ ColumnConstraint constraint = null;
+ SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
+}
+{
+ name = SimpleIdentifier() { pos = getPos(); }
+ type = DataType()
+ [
+ <PRIMARY> <KEY>
+ [ <ASC> { monotonicity = SqlMonotonicity.INCREASING; }
+ | <DESC> { monotonicity = SqlMonotonicity.DECREASING; }
+ ]
+ { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
+ ]
+ {
+ list.add(new ColumnDefinition(name, type, constraint, pos));
+ }
+}
+
+SqlNodeList ColumnDefinitionList() :
+{
+ SqlParserPos pos;
+ List<ColumnDefinition> list = Lists.newArrayList();
+}
+{
+ <LPAREN> { pos = getPos(); }
+ ColumnDef(list)
+ ( <COMMA> ColumnDef(list) )*
+ <RPAREN> {
+ return new SqlNodeList(list, pos.plus(getPos()));
+ }
+}
+
+/**
+ * CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
+ * ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
+ * ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )?
+ * LOCATION location_uri
+ * ( TBLPROPERTIES tbl_properties )?
+ * ( AS select_stmt )
+ */
+SqlNode SqlCreateTable() :
+{
+ SqlParserPos pos;
+ SqlIdentifier tblName;
+ SqlNodeList fieldList;
+ SqlNode location;
+ SqlNode parallelism = null;
+ SqlNode input_format_class_name = null, output_format_class_name = null;
+ SqlNode tbl_properties = null;
+ SqlNode select = null;
+}
+{
+ <CREATE> { pos = getPos(); }
+ <EXTERNAL> <TABLE>
+ tblName = CompoundIdentifier()
+ fieldList = ColumnDefinitionList()
+ [
+ <STORED> <AS>
+ <INPUTFORMAT> input_format_class_name = StringLiteral()
+ <OUTPUTFORMAT> output_format_class_name = StringLiteral()
+ ]
+ <LOCATION>
+ location = StringLiteral()
+ [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
+ [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
+ [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
+ return new SqlCreateTable(pos, tblName, fieldList,
+ input_format_class_name, output_format_class_name, location,
+ parallelism, tbl_properties, select);
+ }
+}
+
+/**
+ * CREATE FUNCTION functionname AS 'classname'
+ */
+SqlNode SqlCreateFunction() :
+{
+ SqlParserPos pos;
+ SqlIdentifier functionName;
+ SqlNode className;
+ SqlNode jarName = null;
+}
+{
+ <CREATE> { pos = getPos(); }
+ <FUNCTION>
+ functionName = CompoundIdentifier()
+ <AS>
+ className = StringLiteral()
+ [
+ <USING> <JAR>
+ jarName = StringLiteral()
+ ]
+ {
+ return new SqlCreateFunction(pos, functionName, className, jarName);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
new file mode 100644
index 0000000..843339f
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.DataContext;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+
+import java.util.List;
+
+public abstract class AbstractTridentProcessor {
+ protected Stream outputStream;
+ protected DataContext dataContext;
+ protected List<CompilingClassLoader> classLoaders;
+ /**
+ * @return the output stream of the SQL
+ */
+ public Stream outputStream() {
+ return outputStream;
+ }
+
+ /**
+ * Construct the trident topology based on the SQL.
+ */
+ public abstract TridentTopology build();
+
+ /**
+ * @return DataContext instance which is used with execution of query
+ */
+ public DataContext getDataContext() {
+ return dataContext;
+ }
+
+ /**
+ * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
+ */
+ public List<CompilingClassLoader> getClassLoaders() {
+ return classLoaders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
new file mode 100644
index 0000000..5dec4af
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.sql.runtime.ChannelHandler;
+
+import java.util.Map;
+
+/**
+ * The StormSql class provides standalone, interactive interfaces to execute
+ * SQL statements over streaming data.
+ * <p>
+ * The StormSql class is stateless. The user needs to submit the data
+ * definition language (DDL) statements and the query statements in the same
+ * batch.
+ */
+public abstract class StormSql {
+ /**
+ * Execute the SQL statements in stand-alone mode. The user can retrieve the result by passing in an instance
+ * of {@see ChannelHandler}.
+ */
+ public abstract void execute(Iterable<String> statements,
+ ChannelHandler handler) throws Exception;
+
+ /**
+ * Submit the SQL statements to Nimbus and run it as a topology.
+ */
+ public abstract void submit(
+ String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
+ StormSubmitter.ProgressListener progressListener, String asUser)
+ throws Exception;
+
+ /**
+ * Print out query plan for each query.
+ */
+ public abstract void explain(Iterable<String> statements) throws Exception;
+
+ public static StormSql construct() {
+ return new StormSqlImpl();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
new file mode 100644
index 0000000..007daa7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateFunction;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.TridentTopology;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.Attributes;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+
+import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
+
+class StormSqlImpl extends StormSql {
+ private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ private final SchemaPlus schema = Frameworks.createRootSchema(true);
+ private boolean hasUdf = false;
+
+ @Override
+ public void execute(
+ Iterable<String> statements, ChannelHandler result)
+ throws Exception {
+ Map<String, DataSource> dataSources = new HashMap<>();
+ for (String sql : statements) {
+ StormParser parser = new StormParser(sql);
+ SqlNode node = parser.impl().parseSqlStmtEof();
+ if (node instanceof SqlCreateTable) {
+ handleCreateTable((SqlCreateTable) node, dataSources);
+ } else if (node instanceof SqlCreateFunction) {
+ handleCreateFunction((SqlCreateFunction) node);
+ } else {
+ FrameworkConfig config = buildFrameWorkConfig();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(tree);
+ proc.initialize(dataSources, result);
+ }
+ }
+ }
+
+ @Override
+ public void submit(
+ String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
+ StormSubmitter.ProgressListener progressListener, String asUser)
+ throws Exception {
+ Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+ for (String sql : statements) {
+ StormParser parser = new StormParser(sql);
+ SqlNode node = parser.impl().parseSqlStmtEof();
+ if (node instanceof SqlCreateTable) {
+ handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+ } else if (node instanceof SqlCreateFunction) {
+ handleCreateFunction((SqlCreateFunction) node);
+ } else {
+ QueryPlanner planner = new QueryPlanner(schema);
+ AbstractTridentProcessor processor = planner.compile(dataSources, sql);
+ TridentTopology topo = processor.build();
+
+ Path jarPath = null;
+ try {
+ // QueryPlanner on Trident mode configures the topology with compiled classes,
+ // so we need to add new classes into topology jar
+ // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
+
+ jarPath = Files.createTempFile("storm-sql", ".jar");
+ System.setProperty("storm.jar", jarPath.toString());
+ packageTopology(jarPath, processor);
+ StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
+ } finally {
+ if (jarPath != null) {
+ Files.delete(jarPath);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void explain(Iterable<String> statements) throws Exception {
+ Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+ for (String sql : statements) {
+ StormParser parser = new StormParser(sql);
+ SqlNode node = parser.impl().parseSqlStmtEof();
+
+ System.out.println("===========================================================");
+ System.out.println("query>");
+ System.out.println(sql);
+ System.out.println("-----------------------------------------------------------");
+
+ if (node instanceof SqlCreateTable) {
+ handleCreateTableForTrident((SqlCreateTable) node, dataSources);
+ System.out.println("No plan presented on DDL");
+ } else if (node instanceof SqlCreateFunction) {
+ handleCreateFunction((SqlCreateFunction) node);
+ System.out.println("No plan presented on DDL");
+ } else {
+ FrameworkConfig config = buildFrameWorkConfig();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+
+ String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
+ System.out.println("plan>");
+ System.out.println(plan);
+ }
+
+ System.out.println("===========================================================");
+ }
+ }
+
+ private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
+ Manifest manifest = new Manifest();
+ Attributes attr = manifest.getMainAttributes();
+ attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
+ attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
+ try (JarOutputStream out = new JarOutputStream(
+ new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
+ List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
+ if (classLoaders != null && !classLoaders.isEmpty()) {
+ for (CompilingClassLoader classLoader : classLoaders) {
+ for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
+ out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
+ out.write(e.getValue().toByteArray());
+ out.closeEntry();
+ }
+ }
+ }
+ }
+ }
+
+ private void handleCreateTable(
+ SqlCreateTable n, Map<String, DataSource> dataSources) {
+ List<FieldInfo> fields = updateSchema(n);
+ DataSource ds = DataSourcesRegistry.construct(n.location(), n
+ .inputFormatClass(), n.outputFormatClass(), fields);
+ if (ds == null) {
+ throw new RuntimeException("Cannot construct data source for " + n
+ .tableName());
+ } else if (dataSources.containsKey(n.tableName())) {
+ throw new RuntimeException("Duplicated definition for table " + n
+ .tableName());
+ }
+ dataSources.put(n.tableName(), ds);
+ }
+
+ private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException {
+ if(sqlCreateFunction.jarName() != null) {
+ throw new UnsupportedOperationException("UDF 'USING JAR' not implemented");
+ }
+ Method method;
+ Function function;
+ if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) {
+ function = ScalarFunctionImpl.create(method);
+ } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
+ function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
+ } else {
+ throw new RuntimeException("Invalid scalar or aggregate function");
+ }
+ schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
+ hasUdf = true;
+ }
+
+ private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException {
+ Class<?> clazz = Class.forName(clazzName);
+ for (Method method : clazz.getMethods()) {
+ if (method.getName().equals(methodName)) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ private void handleCreateTableForTrident(
+ SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
+ List<FieldInfo> fields = updateSchema(n);
+ ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
+ .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
+ if (ds == null) {
+ throw new RuntimeException("Failed to find data source for " + n
+ .tableName() + " URI: " + n.location());
+ } else if (dataSources.containsKey(n.tableName())) {
+ throw new RuntimeException("Duplicated definition for table " + n
+ .tableName());
+ }
+ dataSources.put(n.tableName(), ds);
+ }
+
+ private List<FieldInfo> updateSchema(SqlCreateTable n) {
+ TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
+ List<FieldInfo> fields = new ArrayList<>();
+ for (ColumnDefinition col : n.fieldList()) {
+ builder.field(col.name(), col.type(), col.constraint());
+ RelDataType dataType = col.type().deriveType(typeFactory);
+ Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+ ColumnConstraint constraint = col.constraint();
+ boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
+ fields.add(new FieldInfo(col.name(), javaType, isPrimary));
+ }
+
+ if (n.parallelism() != null) {
+ builder.parallelismHint(n.parallelism());
+ }
+ Table table = builder.build();
+ schema.add(n.tableName(), table);
+ return fields;
+ }
+
+ private FrameworkConfig buildFrameWorkConfig() {
+ if (hasUdf) {
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ false,
+ Collections.<String>emptyList(), typeFactory));
+ return Frameworks.newConfigBuilder().defaultSchema(schema)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
+ } else {
+ return Frameworks.newConfigBuilder().defaultSchema(schema).build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
new file mode 100644
index 0000000..5618647
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.utils.Utils;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+public class StormSqlRunner {
+ private static final String OPTION_SQL_FILE_SHORT = "f";
+ private static final String OPTION_SQL_FILE_LONG = "file";
+ private static final String OPTION_SQL_TOPOLOGY_NAME_SHORT = "t";
+ private static final String OPTION_SQL_TOPOLOGY_NAME_LONG = "topology";
+ private static final String OPTION_SQL_EXPLAIN_SHORT = "e";
+ private static final String OPTION_SQL_EXPLAIN_LONG = "explain";
+
+ public static void main(String[] args) throws Exception {
+ Options options = buildOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args);
+
+ if (!commandLine.hasOption(OPTION_SQL_FILE_LONG)) {
+ printUsageAndExit(options, OPTION_SQL_FILE_LONG + " is required");
+ }
+
+ String filePath = commandLine.getOptionValue(OPTION_SQL_FILE_LONG);
+ List<String> stmts = Files.readAllLines(Paths.get(filePath), StandardCharsets.UTF_8);
+ StormSql sql = StormSql.construct();
+ @SuppressWarnings("unchecked")
+ Map<String, ?> conf = Utils.readStormConfig();
+
+ if (commandLine.hasOption(OPTION_SQL_EXPLAIN_LONG)) {
+ sql.explain(stmts);
+ } else if (commandLine.hasOption(OPTION_SQL_TOPOLOGY_NAME_LONG)) {
+ String topoName = commandLine.getOptionValue(OPTION_SQL_TOPOLOGY_NAME_LONG);
+ SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+ sql.submit(topoName, stmts, conf, submitOptions, null, null);
+ } else {
+ printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG +
+ " must be presented");
+ }
+ }
+
+ private static void printUsageAndExit(Options options, String message) {
+ System.out.println(message);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm-sql-runner ", options);
+ System.exit(1);
+ }
+
+ private static Options buildOptions() {
+ Options options = new Options();
+ options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
+ options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
+ options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
+ return options;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
new file mode 100644
index 0000000..c6b584d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.schema.StreamableTable;
+
+/**
+ * Table that can be converted to a stream. This table also has its parallelism information.
+ *
+ * @see Delta
+ */
+public interface ParallelStreamableTable extends StreamableTable {
+
+ /**
+ * Returns parallelism hint of this table. Returns null if don't know.
+ */
+ Integer parallelismHint();
+}