You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:20 UTC

[17/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
deleted file mode 100644
index 776a54c..0000000
--- a/external/sql/storm-sql-core/pom.xml
+++ /dev/null
@@ -1,279 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-sql-core</artifactId>
-
-    <developers>
-        <developer>
-            <id>haohui</id>
-            <name>Haohui Mai</name>
-            <email>ricetons@gmail.com</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-sql-runtime</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-core</artifactId>
-            <version>${calcite.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-annotations</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-            <version>${jackson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-            <scope>runtime</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-collections4</artifactId>
-            <version>4.1</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <sourceDirectory>src/jvm</sourceDirectory>
-        <testSourceDirectory>src/test</testSourceDirectory>
-        <plugins>
-            <plugin>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-fmpp-resources</id>
-                        <phase>initialize</phase>
-                        <goals>
-                            <goal>copy-resources</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${project.build.directory}/codegen</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>src/codegen</directory>
-                                    <filtering>false</filtering>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>copy-java-sources</id>
-                        <phase>process-sources</phase>
-                        <goals>
-                            <goal>copy-resources</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${basedir}/target/classes/</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>src/jvm</directory>
-                                    <filtering>true</filtering>
-                                </resource>
-                                <resource>
-                                    <directory>src/test</directory>
-                                    <filtering>true</filtering>
-                                </resource>
-                                <resource>
-                                    <directory>target/generated-sources</directory>
-                                    <!-- <include>*/org</include> -->
-                                    <filtering>true</filtering>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.2</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <!-- Extract parser grammar template from calcite-core.jar and put
-                     it under ${project.build.directory} where all freemarker templates are. -->
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <version>2.8</version>
-                <executions>
-                    <execution>
-                        <id>unpack-parser-template</id>
-                        <phase>initialize</phase>
-                        <goals>
-                            <goal>unpack</goal>
-                        </goals>
-                        <configuration>
-                            <artifactItems>
-                                <artifactItem>
-                                    <groupId>org.apache.calcite</groupId>
-                                    <artifactId>calcite-core</artifactId>
-                                    <type>jar</type>
-                                    <overWrite>true</overWrite>
-                                    <outputDirectory>${project.build.directory}/</outputDirectory>
-                                    <includes>**/Parser.jj</includes>
-                                </artifactItem>
-                            </artifactItems>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <!-- using appassembler-maven-plugin instead of maven-dependency-plugin to copy dependencies
-            as copy and unpack goal are not working together -->
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>appassembler-maven-plugin</artifactId>
-                <version>1.9</version>
-                <executions>
-                    <execution>
-                        <id>create-repo</id>
-                        <goals>
-                            <goal>create-repository</goal>
-                        </goals>
-                        <configuration>
-                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
-                            <repositoryLayout>flat</repositoryLayout>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>com.googlecode.fmpp-maven-plugin</groupId>
-                <artifactId>fmpp-maven-plugin</artifactId>
-                <version>1.0</version>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.freemarker</groupId>
-                        <artifactId>freemarker</artifactId>
-                        <version>2.3.25-incubating</version>
-                    </dependency>
-                </dependencies>
-                <executions>
-                    <execution>
-                        <id>generate-fmpp-sources</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>generate</goal>
-                        </goals>
-                        <configuration>
-                            <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
-                            <outputDirectory>target/generated-sources</outputDirectory>
-                            <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.5</version>
-                <executions>
-                    <execution>
-                        <id>add-generated-sources</id>
-                        <phase>process-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>${project.build.directory}/generated-sources</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>javacc-maven-plugin</artifactId>
-                <version>2.4</version>
-                <executions>
-                    <execution>
-                        <phase>generate-sources</phase>
-                        <id>javacc</id>
-                        <goals>
-                            <goal>javacc</goal>
-                        </goals>
-                        <configuration>
-                            <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
-                            <includes>
-                                <include>**/Parser.jj</include>
-                            </includes>
-                            <lookAhead>2</lookAhead>
-                            <isStatic>false</isStatic>
-                            <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/config.fmpp b/external/sql/storm-sql-core/src/codegen/config.fmpp
deleted file mode 100644
index be5a792..0000000
--- a/external/sql/storm-sql-core/src/codegen/config.fmpp
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http:# www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-data: {
-  parser:                   tdd(../data/Parser.tdd)
-}
-
-freemarkerLinks: {
-  includes: includes/
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
deleted file mode 100644
index b0dccb6..0000000
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ /dev/null
@@ -1,80 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http:# www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  # Generated parser implementation class package and name
-  package: "org.apache.storm.sql.parser.impl",
-  class: "StormParserImpl",
-
-  # List of import statements.
-  imports: [
-    "org.apache.calcite.sql.validate.*",
-    "org.apache.calcite.util.*",
-    "org.apache.storm.sql.parser.*",
-    "java.util.*"
-  ]
-
-  # List of keywords.
-  keywords: [
-    "LOCATION",
-    "INPUTFORMAT",
-    "OUTPUTFORMAT",
-    "PARALLELISM",
-    "STORED",
-    "TBLPROPERTIES",
-    "JAR"
-  ]
-
-  # List of methods for parsing custom SQL statements.
-  statementParserMethods: [
-    "SqlCreateTable()",
-    "SqlCreateFunction()"
-  ]
-
-  # List of methods for parsing custom literals.
-  # Example: ParseJsonLiteral().
-  literalParserMethods: [
-  ]
-
-  # List of methods for parsing custom data types.
-  dataTypeParserMethods: [
-  ]
-
-  nonReservedKeywords: [
-  ]
-
-  createStatementParserMethods: [
-  ]
-
-  alterStatementParserMethods: [
-  ]
-
-  dropStatementParserMethods: [
-  ]
-
-  # List of files in @includes directory that have parser method
-  # implementations for custom SQL statements, literals or types
-  # given as part of "statementParserMethods", "literalParserMethods" or
-  # "dataTypeParserMethods".
-  implementationFiles: [
-    "parserImpls.ftl"
-  ]
-
-  includeCompoundIdentifier: true,
-  includeBraces: true,
-  includeAdditionalDeclarations: false,
-  allowBangEqual: false
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/includes/license.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/license.ftl b/external/sql/storm-sql-core/src/codegen/includes/license.ftl
deleted file mode 100644
index 7e66353..0000000
--- a/external/sql/storm-sql-core/src/codegen/includes/license.ftl
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
deleted file mode 100644
index 4143840..0000000
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ /dev/null
@@ -1,113 +0,0 @@
-<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-  license agreements. See the NOTICE file distributed with this work for additional
-  information regarding copyright ownership. The ASF licenses this file to
-  You under the Apache License, Version 2.0 (the "License"); you may not use
-  this file except in compliance with the License. You may obtain a copy of
-  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-  by applicable law or agreed to in writing, software distributed under the
-  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-  OF ANY KIND, either express or implied. See the License for the specific
-  language governing permissions and limitations under the License. -->
-
-
-private void ColumnDef(List<ColumnDefinition> list) :
-{
-    SqlParserPos pos;
-    SqlIdentifier name;
-    SqlDataTypeSpec type;
-    ColumnConstraint constraint = null;
-    SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
-}
-{
-    name = SimpleIdentifier() { pos = getPos(); }
-    type = DataType()
-    [
-      <PRIMARY> <KEY>
-      [ <ASC>   { monotonicity = SqlMonotonicity.INCREASING; }
-      | <DESC>  { monotonicity = SqlMonotonicity.DECREASING; }
-      ]
-      { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
-    ]
-    {
-        list.add(new ColumnDefinition(name, type, constraint, pos));
-    }
-}
-
-SqlNodeList ColumnDefinitionList() :
-{
-    SqlParserPos pos;
-    List<ColumnDefinition> list = Lists.newArrayList();
-}
-{
-    <LPAREN> { pos = getPos(); }
-    ColumnDef(list)
-    ( <COMMA> ColumnDef(list) )*
-    <RPAREN> {
-        return new SqlNodeList(list, pos.plus(getPos()));
-    }
-}
-
-/**
- * CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
- *   ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
- *   ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )?
- *   LOCATION location_uri
- *   ( TBLPROPERTIES tbl_properties )?
- *   ( AS select_stmt )
- */
-SqlNode SqlCreateTable() :
-{
-    SqlParserPos pos;
-    SqlIdentifier tblName;
-    SqlNodeList fieldList;
-    SqlNode location;
-    SqlNode parallelism = null;
-    SqlNode input_format_class_name = null, output_format_class_name = null;
-    SqlNode tbl_properties = null;
-    SqlNode select = null;
-}
-{
-    <CREATE> { pos = getPos(); }
-    <EXTERNAL> <TABLE>
-    tblName = CompoundIdentifier()
-    fieldList = ColumnDefinitionList()
-    [
-      <STORED> <AS>
-      <INPUTFORMAT> input_format_class_name = StringLiteral()
-      <OUTPUTFORMAT> output_format_class_name = StringLiteral()
-    ]
-    <LOCATION>
-    location = StringLiteral()
-    [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
-    [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
-    [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
-        return new SqlCreateTable(pos, tblName, fieldList,
-        input_format_class_name, output_format_class_name, location,
-        parallelism, tbl_properties, select);
-    }
-}
-
-/**
- * CREATE FUNCTION functionname AS 'classname'
- */
-SqlNode SqlCreateFunction() :
-{
-    SqlParserPos pos;
-    SqlIdentifier functionName;
-    SqlNode className;
-    SqlNode jarName = null;
-}
-{
-    <CREATE> { pos = getPos(); }
-    <FUNCTION>
-        functionName = CompoundIdentifier()
-    <AS>
-        className = StringLiteral()
-    [
-      <USING> <JAR>
-      jarName = StringLiteral()
-    ]
-    {
-      return new SqlCreateFunction(pos, functionName, className, jarName);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
deleted file mode 100644
index 6af71d4..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.calcite.DataContext;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractTridentProcessor {
-  protected Stream outputStream;
-  protected DataContext dataContext;
-  protected List<CompilingClassLoader> classLoaders;
-  /**
-   * @return the output stream of the SQL
-   */
-  public Stream outputStream() {
-    return outputStream;
-  }
-
-  /**
-   * Construct the trident topology based on the SQL.
-   */
-  public abstract TridentTopology build();
-
-  /**
-   * @return DataContext instance which is used with execution of query
-   */
-  public DataContext getDataContext() {
-    return dataContext;
-  }
-
-  /**
-   * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
-   */
-  public List<CompilingClassLoader> getClassLoaders() {
-    return classLoaders;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
deleted file mode 100644
index 5dec4af..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.runtime.ChannelHandler;
-
-import java.util.Map;
-
-/**
- * The StormSql class provides standalone, interactive interfaces to execute
- * SQL statements over streaming data.
- * <p>
- * The StormSql class is stateless. The user needs to submit the data
- * definition language (DDL) statements and the query statements in the same
- * batch.
- */
-public abstract class StormSql {
-  /**
-   * Execute the SQL statements in stand-alone mode. The user can retrieve the result by passing in an instance
-   * of {@see ChannelHandler}.
-   */
-  public abstract void execute(Iterable<String> statements,
-                               ChannelHandler handler) throws Exception;
-
-  /**
-   * Submit the SQL statements to Nimbus and run it as a topology.
-   */
-  public abstract void submit(
-      String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
-      StormSubmitter.ProgressListener progressListener, String asUser)
-      throws Exception;
-
-  /**
-   * Print out query plan for each query.
-   */
-  public abstract void explain(Iterable<String> statements) throws Exception;
-
-  public static StormSql construct() {
-    return new StormSqlImpl();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
deleted file mode 100644
index 007daa7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
-import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.parser.ColumnConstraint;
-import org.apache.storm.sql.parser.ColumnDefinition;
-import org.apache.storm.sql.parser.SqlCreateFunction;
-import org.apache.storm.sql.parser.SqlCreateTable;
-import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.TridentTopology;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.Attributes;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-
-import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
-
-class StormSqlImpl extends StormSql {
-  private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-  private final SchemaPlus schema = Frameworks.createRootSchema(true);
-  private boolean hasUdf = false;
-
-  @Override
-  public void execute(
-      Iterable<String> statements, ChannelHandler result)
-      throws Exception {
-    Map<String, DataSource> dataSources = new HashMap<>();
-    for (String sql : statements) {
-      StormParser parser = new StormParser(sql);
-      SqlNode node = parser.impl().parseSqlStmtEof();
-      if (node instanceof SqlCreateTable) {
-        handleCreateTable((SqlCreateTable) node, dataSources);
-      } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
-      } else {
-        FrameworkConfig config = buildFrameWorkConfig();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-        PlanCompiler compiler = new PlanCompiler(typeFactory);
-        AbstractValuesProcessor proc = compiler.compile(tree);
-        proc.initialize(dataSources, result);
-      }
-    }
-  }
-
-  @Override
-  public void submit(
-      String name, Iterable<String> statements, Map<String, ?> stormConf, SubmitOptions opts,
-      StormSubmitter.ProgressListener progressListener, String asUser)
-      throws Exception {
-    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
-    for (String sql : statements) {
-      StormParser parser = new StormParser(sql);
-      SqlNode node = parser.impl().parseSqlStmtEof();
-      if (node instanceof SqlCreateTable) {
-        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
-      } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
-      }  else {
-        QueryPlanner planner = new QueryPlanner(schema);
-        AbstractTridentProcessor processor = planner.compile(dataSources, sql);
-        TridentTopology topo = processor.build();
-
-        Path jarPath = null;
-        try {
-          // QueryPlanner on Trident mode configures the topology with compiled classes,
-          // so we need to add new classes into topology jar
-          // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
-
-          jarPath = Files.createTempFile("storm-sql", ".jar");
-          System.setProperty("storm.jar", jarPath.toString());
-          packageTopology(jarPath, processor);
-          StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
-        } finally {
-          if (jarPath != null) {
-            Files.delete(jarPath);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void explain(Iterable<String> statements) throws Exception {
-    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
-    for (String sql : statements) {
-      StormParser parser = new StormParser(sql);
-      SqlNode node = parser.impl().parseSqlStmtEof();
-
-      System.out.println("===========================================================");
-      System.out.println("query>");
-      System.out.println(sql);
-      System.out.println("-----------------------------------------------------------");
-
-      if (node instanceof SqlCreateTable) {
-        handleCreateTableForTrident((SqlCreateTable) node, dataSources);
-        System.out.println("No plan presented on DDL");
-      } else if (node instanceof SqlCreateFunction) {
-        handleCreateFunction((SqlCreateFunction) node);
-        System.out.println("No plan presented on DDL");
-      } else {
-        FrameworkConfig config = buildFrameWorkConfig();
-        Planner planner = Frameworks.getPlanner(config);
-        SqlNode parse = planner.parse(sql);
-        SqlNode validate = planner.validate(parse);
-        RelNode tree = planner.convert(validate);
-
-        String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
-        System.out.println("plan>");
-        System.out.println(plan);
-      }
-
-      System.out.println("===========================================================");
-    }
-  }
-
-  private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
-    Manifest manifest = new Manifest();
-    Attributes attr = manifest.getMainAttributes();
-    attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
-    attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
-    try (JarOutputStream out = new JarOutputStream(
-            new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
-      List<CompilingClassLoader> classLoaders = processor.getClassLoaders();
-      if (classLoaders != null && !classLoaders.isEmpty()) {
-        for (CompilingClassLoader classLoader : classLoaders) {
-          for (Map.Entry<String, ByteArrayOutputStream> e : classLoader.getClasses().entrySet()) {
-            out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
-            out.write(e.getValue().toByteArray());
-            out.closeEntry();
-          }
-        }
-      }
-    }
-  }
-
-  private void handleCreateTable(
-      SqlCreateTable n, Map<String, DataSource> dataSources) {
-    List<FieldInfo> fields = updateSchema(n);
-    DataSource ds = DataSourcesRegistry.construct(n.location(), n
-        .inputFormatClass(), n.outputFormatClass(), fields);
-    if (ds == null) {
-      throw new RuntimeException("Cannot construct data source for " + n
-          .tableName());
-    } else if (dataSources.containsKey(n.tableName())) {
-      throw new RuntimeException("Duplicated definition for table " + n
-          .tableName());
-    }
-    dataSources.put(n.tableName(), ds);
-  }
-
-  private void handleCreateFunction(SqlCreateFunction sqlCreateFunction) throws ClassNotFoundException {
-    if(sqlCreateFunction.jarName() != null) {
-      throw new UnsupportedOperationException("UDF 'USING JAR' not implemented");
-    }
-    Method method;
-    Function function;
-    if ((method=findMethod(sqlCreateFunction.className(), "evaluate")) != null) {
-      function = ScalarFunctionImpl.create(method);
-    } else if (findMethod(sqlCreateFunction.className(), "add") != null) {
-      function = AggregateFunctionImpl.create(Class.forName(sqlCreateFunction.className()));
-    } else {
-      throw new RuntimeException("Invalid scalar or aggregate function");
-    }
-    schema.add(sqlCreateFunction.functionName().toUpperCase(), function);
-    hasUdf = true;
-  }
-
-  private Method findMethod(String clazzName, String methodName) throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(clazzName);
-    for (Method method : clazz.getMethods()) {
-      if (method.getName().equals(methodName)) {
-        return method;
-      }
-    }
-    return null;
-  }
-
-  private void handleCreateTableForTrident(
-      SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
-    List<FieldInfo> fields = updateSchema(n);
-    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
-        .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
-    if (ds == null) {
-      throw new RuntimeException("Failed to find data source for " + n
-          .tableName() + " URI: " + n.location());
-    } else if (dataSources.containsKey(n.tableName())) {
-      throw new RuntimeException("Duplicated definition for table " + n
-          .tableName());
-    }
-    dataSources.put(n.tableName(), ds);
-  }
-
-  private List<FieldInfo> updateSchema(SqlCreateTable n) {
-    TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
-    List<FieldInfo> fields = new ArrayList<>();
-    for (ColumnDefinition col : n.fieldList()) {
-      builder.field(col.name(), col.type(), col.constraint());
-      RelDataType dataType = col.type().deriveType(typeFactory);
-      Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
-      ColumnConstraint constraint = col.constraint();
-      boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
-      fields.add(new FieldInfo(col.name(), javaType, isPrimary));
-    }
-
-    if (n.parallelism() != null) {
-      builder.parallelismHint(n.parallelism());
-    }
-    Table table = builder.build();
-    schema.add(n.tableName(), table);
-    return fields;
-  }
-
-  private FrameworkConfig buildFrameWorkConfig() {
-    if (hasUdf) {
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(SqlStdOperatorTable.instance());
-      sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
-                                                     false,
-                                                     Collections.<String>emptyList(), typeFactory));
-      return Frameworks.newConfigBuilder().defaultSchema(schema)
-              .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
-    } else {
-      return Frameworks.newConfigBuilder().defaultSchema(schema).build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
deleted file mode 100644
index 5618647..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlRunner.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
-import org.apache.storm.utils.Utils;
-
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-
-public class StormSqlRunner {
-    private static final String OPTION_SQL_FILE_SHORT = "f";
-    private static final String OPTION_SQL_FILE_LONG = "file";
-    private static final String OPTION_SQL_TOPOLOGY_NAME_SHORT = "t";
-    private static final String OPTION_SQL_TOPOLOGY_NAME_LONG = "topology";
-    private static final String OPTION_SQL_EXPLAIN_SHORT = "e";
-    private static final String OPTION_SQL_EXPLAIN_LONG = "explain";
-
-    public static void main(String[] args) throws Exception {
-        Options options = buildOptions();
-        CommandLineParser parser = new DefaultParser();
-        CommandLine commandLine = parser.parse(options, args);
-
-        if (!commandLine.hasOption(OPTION_SQL_FILE_LONG)) {
-            printUsageAndExit(options, OPTION_SQL_FILE_LONG + " is required");
-        }
-
-        String filePath = commandLine.getOptionValue(OPTION_SQL_FILE_LONG);
-        List<String> stmts = Files.readAllLines(Paths.get(filePath), StandardCharsets.UTF_8);
-        StormSql sql = StormSql.construct();
-        @SuppressWarnings("unchecked")
-        Map<String, ?> conf = Utils.readStormConfig();
-
-        if (commandLine.hasOption(OPTION_SQL_EXPLAIN_LONG)) {
-            sql.explain(stmts);
-        } else if (commandLine.hasOption(OPTION_SQL_TOPOLOGY_NAME_LONG)) {
-            String topoName = commandLine.getOptionValue(OPTION_SQL_TOPOLOGY_NAME_LONG);
-            SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
-            sql.submit(topoName, stmts, conf, submitOptions, null, null);
-        } else {
-            printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG +
-                    " must be presented");
-        }
-    }
-
-    private static void printUsageAndExit(Options options, String message) {
-        System.out.println(message);
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("storm-sql-runner ", options);
-        System.exit(1);
-    }
-
-    private static Options buildOptions() {
-        Options options = new Options();
-        options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
-        options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
-        options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
-        return options;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
deleted file mode 100644
index c6b584d..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.calcite;
-
-import org.apache.calcite.rel.stream.Delta;
-import org.apache.calcite.schema.StreamableTable;
-
-/**
- * Table that can be converted to a stream. This table also has its parallelism information.
- *
- * @see Delta
- */
-public interface ParallelStreamableTable extends StreamableTable {
-
-    /**
-     * Returns parallelism hint of this table. Returns null if don't know.
-     */
-    Integer parallelismHint();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
deleted file mode 100644
index 2e237c0..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.SqlDataTypeSpec;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.parser.ColumnConstraint;
-
-import java.util.ArrayList;
-
-import static org.apache.calcite.rel.RelFieldCollation.Direction;
-import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
-import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
-import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
-import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
-
-public class CompilerUtil {
-  public static String escapeJavaString(String s, boolean nullMeansNull) {
-      if(s == null) {
-        return nullMeansNull ? "null" : "\"\"";
-      } else {
-        String s1 = Util.replace(s, "\\", "\\\\");
-        String s2 = Util.replace(s1, "\"", "\\\"");
-        String s3 = Util.replace(s2, "\n\r", "\\n");
-        String s4 = Util.replace(s3, "\n", "\\n");
-        String s5 = Util.replace(s4, "\r", "\\r");
-        return "\"" + s5 + "\"";
-      }
-  }
-
-  public static class TableBuilderInfo {
-    private final RelDataTypeFactory typeFactory;
-
-    public TableBuilderInfo(RelDataTypeFactory typeFactory) {
-      this.typeFactory = typeFactory;
-    }
-
-    private static class FieldType {
-      private final String name;
-      private final RelDataType relDataType;
-
-      private FieldType(String name, RelDataType relDataType) {
-        this.name = name;
-        this.relDataType = relDataType;
-      }
-
-    }
-
-    private final ArrayList<FieldType> fields = new ArrayList<>();
-    private final ArrayList<Object[]> rows = new ArrayList<>();
-    private int primaryKey = -1;
-    private Integer parallelismHint;
-    private SqlMonotonicity primaryKeyMonotonicity;
-    private Statistic stats;
-
-    public TableBuilderInfo field(String name, SqlTypeName type) {
-      return field(name, typeFactory.createSqlType(type));
-    }
-
-    public TableBuilderInfo field(String name, RelDataType type) {
-      fields.add(new FieldType(name, type));
-      return this;
-    }
-
-    public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
-      RelDataType dataType = type.deriveType(typeFactory);
-      if (constraint instanceof ColumnConstraint.PrimaryKey) {
-        ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
-        Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
-        primaryKey = fields.size();
-        primaryKeyMonotonicity = pk.monotonicity();
-      }
-      fields.add(new FieldType(name, dataType));
-      return this;
-    }
-
-    public TableBuilderInfo statistics(Statistic stats) {
-      this.stats = stats;
-      return this;
-    }
-
-    @VisibleForTesting
-    public TableBuilderInfo rows(Object[] data) {
-      rows.add(data);
-      return this;
-    }
-
-    public TableBuilderInfo parallelismHint(int parallelismHint) {
-      this.parallelismHint = parallelismHint;
-      return this;
-    }
-
-    public StreamableTable build() {
-      final Statistic stat = buildStatistic();
-      final Table tbl = new Table() {
-        @Override
-        public RelDataType getRowType(
-            RelDataTypeFactory relDataTypeFactory) {
-          RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
-          for (FieldType f : fields) {
-            b.add(f.name, f.relDataType);
-          }
-          return b.build();
-        }
-
-        @Override
-        public Statistic getStatistic() {
-          return stat != null ? stat : Statistics.of(rows.size(),
-                                                     ImmutableList.<ImmutableBitSet>of());
-        }
-
-        @Override
-        public Schema.TableType getJdbcTableType() {
-          return Schema.TableType.STREAM;
-        }
-      };
-
-      return new ParallelStreamableTable() {
-        @Override
-        public Integer parallelismHint() {
-          return parallelismHint;
-        }
-
-        @Override
-        public Table stream() {
-          return tbl;
-        }
-
-        @Override
-        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
-          return tbl.getRowType(relDataTypeFactory);
-        }
-
-        @Override
-        public Statistic getStatistic() {
-          return tbl.getStatistic();
-        }
-
-        @Override
-        public Schema.TableType getJdbcTableType() {
-          return Schema.TableType.STREAM;
-        }
-      };
-    }
-
-    private Statistic buildStatistic() {
-      if (stats != null || primaryKey == -1) {
-        return stats;
-      }
-      Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
-      RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
-      return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
-          ImmutableList.of(RelCollations.of(collation)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
deleted file mode 100644
index 5ac95e0..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.calcite.adapter.enumerable.JavaRowFormat;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.BlockBuilder;
-import org.apache.calcite.linq4j.tree.BlockStatement;
-import org.apache.calcite.linq4j.tree.ClassDeclaration;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.MemberDeclaration;
-import org.apache.calcite.linq4j.tree.ParameterExpression;
-import org.apache.calcite.linq4j.tree.Types;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.util.BuiltInMethod;
-import org.apache.calcite.util.Pair;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.List;
-
-/**
- * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
- *
- * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
- * we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
- */
-public class RexNodeToJavaCodeCompiler {
-  private final RexBuilder rexBuilder;
-
-  public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
-    this.rexBuilder = rexBuilder;
-  }
-
-  public BlockStatement compileToBlock(List<RexNode> nodes, RelDataType inputRowType) {
-    final RexProgramBuilder programBuilder =
-        new RexProgramBuilder(inputRowType, rexBuilder);
-    for (RexNode node : nodes) {
-      programBuilder.addProject(node, null);
-    }
-
-    return compileToBlock(programBuilder.getProgram());
-  }
-
-  public BlockStatement compileToBlock(final RexProgram program) {
-    final ParameterExpression context_ =
-            Expressions.parameter(Context.class, "context");
-    final ParameterExpression outputValues_ =
-            Expressions.parameter(Object[].class, "outputValues");
-
-    return compileToBlock(program, context_, outputValues_).toBlock();
-  }
-
-  public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
-    final RexProgramBuilder programBuilder =
-            new RexProgramBuilder(inputRowType, rexBuilder);
-    for (RexNode node : nodes) {
-      programBuilder.addProject(node, null);
-    }
-
-    return compile(programBuilder.getProgram(), className);
-  }
-
-  public String compile(final RexProgram program, String className) {
-    final ParameterExpression context_ =
-            Expressions.parameter(Context.class, "context");
-    final ParameterExpression outputValues_ =
-            Expressions.parameter(Object[].class, "outputValues");
-
-    BlockBuilder builder = compileToBlock(program, context_, outputValues_);
-    return baz(context_, outputValues_, builder.toBlock(), className);
-  }
-
-  private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
-                                        ParameterExpression outputValues_) {
-    RelDataType inputRowType = program.getInputRowType();
-    final BlockBuilder builder = new BlockBuilder();
-    final JavaTypeFactoryImpl javaTypeFactory =
-            new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
-
-    final RexToLixTranslator.InputGetter inputGetter =
-            new RexToLixTranslator.InputGetterImpl(
-                    ImmutableList.of(
-                            Pair.<Expression, PhysType>of(
-                                    Expressions.field(context_,
-                                            BuiltInMethod.CONTEXT_VALUES.field),
-                                    PhysTypeImpl.of(javaTypeFactory, inputRowType,
-                                            JavaRowFormat.ARRAY, false))));
-    final Function1<String, RexToLixTranslator.InputGetter> correlates =
-            new Function1<String, RexToLixTranslator.InputGetter>() {
-              public RexToLixTranslator.InputGetter apply(String a0) {
-                throw new UnsupportedOperationException();
-              }
-            };
-    final Expression root =
-            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
-    final List<Expression> list =
-            RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
-                    null, root, inputGetter, correlates);
-    for (int i = 0; i < list.size(); i++) {
-      builder.add(
-              Expressions.statement(
-                      Expressions.assign(
-                              Expressions.arrayIndex(outputValues_,
-                                      Expressions.constant(i)),
-                              list.get(i))));
-    }
-
-    return builder;
-  }
-
-  /** Given a method that implements {@link ExecutableExpression#execute(Context, Object[])},
-   * adds a bridge method that implements {@link ExecutableExpression#execute(Context)}, and
-   * compiles. */
-  static String baz(ParameterExpression context_,
-                    ParameterExpression outputValues_, BlockStatement block, String className) {
-    final List<MemberDeclaration> declarations = Lists.newArrayList();
-
-    // public void execute(Context, Object[] outputValues)
-    declarations.add(
-            Expressions.methodDecl(Modifier.PUBLIC, void.class,
-                    StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
-                    ImmutableList.of(context_, outputValues_), block));
-
-    // public Object execute(Context)
-    final BlockBuilder builder = new BlockBuilder();
-    final Expression values_ = builder.append("values",
-            Expressions.newArrayBounds(Object.class, 1,
-                    Expressions.constant(1)));
-    builder.add(
-            Expressions.statement(
-                    Expressions.call(
-                            Expressions.parameter(ExecutableExpression.class, "this"),
-                            StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
-    builder.add(
-            Expressions.return_(null,
-                    Expressions.arrayIndex(values_, Expressions.constant(0))));
-    declarations.add(
-            Expressions.methodDecl(Modifier.PUBLIC, Object.class,
-                    StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
-                    ImmutableList.of(context_), builder.toBlock()));
-
-    final ClassDeclaration classDeclaration =
-            Expressions.classDecl(Modifier.PUBLIC, className, null,
-                    ImmutableList.<Type>of(ExecutableExpression.class), declarations);
-
-    return Expressions.toString(Lists.newArrayList(classDeclaration), "\n", false);
-  }
-
-  enum StormBuiltInMethod {
-    EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
-    EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
-
-    public final Method method;
-    public final Constructor constructor;
-    public final Field field;
-
-    public static final ImmutableMap<Method, BuiltInMethod> MAP;
-
-    static {
-      final ImmutableMap.Builder<Method, BuiltInMethod> builder =
-              ImmutableMap.builder();
-      for (BuiltInMethod value : BuiltInMethod.values()) {
-        if (value.method != null) {
-          builder.put(value.method, value);
-        }
-      }
-      MAP = builder.build();
-    }
-
-    private StormBuiltInMethod(Method method, Constructor constructor, Field field) {
-      this.method = method;
-      this.constructor = constructor;
-      this.field = field;
-    }
-
-    /**
-     * Defines a method.
-     */
-    StormBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
-      this(Types.lookupMethod(clazz, methodName, argumentTypes), null, null);
-    }
-
-    /**
-     * Defines a constructor.
-     */
-    StormBuiltInMethod(Class clazz, Class... argumentTypes) {
-      this(null, Types.lookupConstructor(clazz, argumentTypes), null);
-    }
-
-    /**
-     * Defines a field.
-     */
-    StormBuiltInMethod(Class clazz, String fieldName, boolean dummy) {
-      this(null, null, Types.lookupField(clazz, fieldName));
-      assert dummy : "dummy value for method overloading must be true";
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
deleted file mode 100644
index 21ca063..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl {
-
-    public StormSqlTypeFactoryImpl() {
-    }
-
-    public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
-        super(typeSystem);
-    }
-
-    @Override
-    public RelDataType toSql(RelDataType type) {
-        if (type instanceof JavaType) {
-            JavaType javaType = (JavaType) type;
-            SqlTypeName sqlTypeName = JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass());
-            if (sqlTypeName == null) {
-                sqlTypeName = SqlTypeName.ANY;
-            }
-            return createTypeWithNullability(createSqlType(sqlTypeName), type.isNullable());
-        }
-        return super.toSql(type);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
deleted file mode 100644
index 9dc4ba8..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.storm.tuple.Values;
-
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Built-in implementations for some of the standard aggregation operations.
- * Aggregations can be implemented as a class with the following methods viz. init, add and result.
- * The class could contain only static methods, only non-static methods or be generic.
- */
-public class BuiltinAggregateFunctions {
-    // binds the type information and the class implementing the aggregation
-    public static class TypeClass {
-        public static class GenericType {
-        }
-
-        public final Type ty;
-        public final Class<?> clazz;
-
-        private TypeClass(Type ty, Class<?> clazz) {
-            this.ty = ty;
-            this.clazz = clazz;
-        }
-
-        static TypeClass of(Type ty, Class<?> clazz) {
-            return new TypeClass(ty, clazz);
-        }
-    }
-
-    static final Map<String, List<TypeClass>> TABLE = new HashMap<>();
-
-    public static class ByteSum {
-        public static Byte init() {
-            return 0;
-        }
-
-        public static Byte add(Byte accumulator, Byte val) {
-            return (byte) (accumulator + val);
-        }
-
-        public static Byte result(Byte accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class ShortSum {
-        public static Short init() {
-            return 0;
-        }
-
-        public static Short add(Short accumulator, Short val) {
-            return (short) (accumulator + val);
-        }
-
-        public static Short result(Short accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class IntSum {
-        public static Integer init() {
-            return 0;
-        }
-
-        public static Integer add(Integer accumulator, Integer val) {
-            return accumulator + val;
-        }
-
-        public static Integer result(Integer accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class LongSum {
-        public static Long init() {
-            return 0L;
-        }
-
-        public static Long add(Long accumulator, Long val) {
-            return accumulator + val;
-        }
-
-        public static Long result(Long accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class FloatSum {
-        public static Float init() {
-            return 0.0f;
-        }
-
-        public static Float add(Float accumulator, Float val) {
-            return accumulator + val;
-        }
-
-        public static Float result(Float accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class DoubleSum {
-        public static Double init() {
-            return 0.0;
-        }
-
-        public static Double add(Double accumulator, Double val) {
-            return accumulator + val;
-        }
-
-        public static Double result(Double accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class Max<T extends Comparable<T>> {
-        public T init() {
-            return null;
-        }
-
-        public T add(T accumulator, T val) {
-            return (accumulator == null || accumulator.compareTo(val) < 0) ? val : accumulator;
-        }
-
-        public T result(T accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class Min<T extends Comparable<T>> {
-        public T init() {
-            return null;
-        }
-
-        public T add(T accumulator, T val) {
-            return (accumulator == null || accumulator.compareTo(val) > 0) ? val : accumulator;
-        }
-
-        public T result(T accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class IntAvg {
-        private int count;
-
-        public Integer init() {
-            return 0;
-        }
-
-        public Integer add(Integer accumulator, Integer val) {
-            ++count;
-            return accumulator + val;
-        }
-
-        public Integer result(Integer accumulator) {
-            Integer result = accumulator / count;
-            count = 0;
-            return result;
-        }
-    }
-
-    public static class DoubleAvg {
-        private int count;
-
-        public Double init() {
-            return 0.0;
-        }
-
-        public Double add(Double accumulator, Double val) {
-            ++count;
-            return accumulator + val;
-        }
-
-        public Double result(Double accumulator) {
-            Double result = accumulator / count;
-            count = 0;
-            return result;
-        }
-    }
-
-    public static class Count {
-        public static Long init() {
-            return 0L;
-        }
-
-        public static Long add(Long accumulator, Values vals) {
-            for (Object val : vals) {
-                if (val == null) {
-                    return accumulator;
-                }
-            }
-            return accumulator + 1;
-        }
-
-        public static Long result(Long accumulator) {
-            return accumulator;
-        }
-    }
-
-    static {
-        TABLE.put("SUM", ImmutableList.of(
-                TypeClass.of(float.class, FloatSum.class),
-                TypeClass.of(double.class, DoubleSum.class),
-                TypeClass.of(byte.class, ByteSum.class),
-                TypeClass.of(short.class, ShortSum.class),
-                TypeClass.of(long.class, LongSum.class),
-                TypeClass.of(int.class, IntSum.class)));
-        TABLE.put("AVG", ImmutableList.of(
-                TypeClass.of(double.class, DoubleAvg.class),
-                TypeClass.of(int.class, IntAvg.class)));
-        TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, Count.class)));
-        TABLE.put("MAX", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class)));
-        TABLE.put("MIN", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
deleted file mode 100644
index 01546ed..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.Set;
-
-public class PlanCompiler {
-  private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class);
-
-  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
-  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
-  private static final String PROLOGUE = NEW_LINE_JOINER.join(
-      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
-      "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;",
-      "import java.util.List;", "import java.util.ArrayList;",
-      "import java.util.LinkedHashMap;",
-      "import org.apache.storm.tuple.Values;",
-      "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
-      "import org.apache.storm.sql.runtime.Channels;",
-      "import org.apache.storm.sql.runtime.ChannelContext;",
-      "import org.apache.storm.sql.runtime.ChannelHandler;",
-      "import org.apache.storm.sql.runtime.DataSource;",
-      "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
-      "import com.google.common.collect.ArrayListMultimap;",
-      "import com.google.common.collect.Multimap;",
-      "import org.apache.calcite.interpreter.Context;",
-      "import org.apache.calcite.interpreter.StormContext;",
-      "import org.apache.calcite.DataContext;",
-      "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
-      "public final class Processor extends AbstractValuesProcessor {",
-      "  public final static DataContext dataContext = new StormDataContext();",
-      "");
-  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
-      "  @Override",
-      "  public void initialize(Map<String, DataSource> data,",
-      "                         ChannelHandler result) {",
-      "    ChannelContext r = Channels.chain(Channels.voidContext(), result);",
-      ""
-  );
-
-  private final JavaTypeFactory typeFactory;
-
-  public PlanCompiler(JavaTypeFactory typeFactory) {
-    this.typeFactory = typeFactory;
-  }
-
-  private String generateJavaSource(RelNode root) throws Exception {
-    StringWriter sw = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(sw)) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      printPrologue(pw);
-      compiler.traverse(root);
-      printMain(pw, root);
-      printEpilogue(pw);
-    }
-    return sw.toString();
-  }
-
-  private void printMain(PrintWriter pw, RelNode root) {
-    Set<TableScan> tables = new HashSet<>();
-    pw.print(INITIALIZER_PROLOGUE);
-    chainOperators(pw, root, tables);
-    for (TableScan n : tables) {
-      String escaped = CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(n.getTable().getQualifiedName()), true);
-      String r = NEW_LINE_JOINER.join(
-          "    if (!data.containsKey(%1$s))",
-          "      throw new RuntimeException(\"Cannot find table \" + %1$s);",
-          "  data.get(%1$s).open(CTX_%2$d);",
-          "");
-      pw.print(String.format(r, escaped, n.getId()));
-    }
-    pw.print("  }\n");
-  }
-
-  private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> tables) {
-    doChainOperators(pw, root, tables, "r");
-  }
-
-  private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> tables, String parentCtx) {
-    pw.print(
-            String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
-                          node.getId(), parentCtx, RelNodeCompiler.getStageName(node)));
-    String currentCtx = String.format("CTX_%d", node.getId());
-    if (node instanceof TableScan) {
-      tables.add((TableScan) node);
-    }
-    for (RelNode i : node.getInputs()) {
-      doChainOperators(pw, i, tables, currentCtx);
-    }
-  }
-
-  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
-    String javaCode = generateJavaSource(plan);
-    LOG.debug("Compiling... source code {}", javaCode);
-    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
-                                              PACKAGE_NAME + ".Processor",
-                                              javaCode, null);
-    return (AbstractValuesProcessor) cl.loadClass(
-        PACKAGE_NAME + ".Processor").newInstance();
-  }
-
-  private static void printEpilogue(
-      PrintWriter pw) throws Exception {
-    pw.print("}\n");
-  }
-
-  private static void printPrologue(PrintWriter pw) {
-    pw.append(PROLOGUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
deleted file mode 100644
index afed8a9..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.*;
-import org.apache.calcite.rel.stream.Delta;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class PostOrderRelNodeVisitor<T> {
-  public final T traverse(RelNode n) throws Exception {
-    List<T> inputStreams = new ArrayList<>();
-    for (RelNode input : n.getInputs()) {
-      inputStreams.add(traverse(input));
-    }
-
-    if (n instanceof Aggregate) {
-      return visitAggregate((Aggregate) n, inputStreams);
-    } else if (n instanceof Calc) {
-      return visitCalc((Calc) n, inputStreams);
-    } else if (n instanceof Collect) {
-      return visitCollect((Collect) n, inputStreams);
-    } else if (n instanceof Correlate) {
-      return visitCorrelate((Correlate) n, inputStreams);
-    } else if (n instanceof Delta) {
-      return visitDelta((Delta) n, inputStreams);
-    } else if (n instanceof Exchange) {
-      return visitExchange((Exchange) n, inputStreams);
-    } else if (n instanceof Project) {
-      return visitProject((Project) n, inputStreams);
-    } else if (n instanceof Filter) {
-      return visitFilter((Filter) n, inputStreams);
-    } else if (n instanceof Sample) {
-      return visitSample((Sample) n, inputStreams);
-    } else if (n instanceof Sort) {
-      return visitSort((Sort) n, inputStreams);
-    } else if (n instanceof TableModify) {
-      return visitTableModify((TableModify) n, inputStreams);
-    } else if (n instanceof TableScan) {
-      return visitTableScan((TableScan) n, inputStreams);
-    } else if (n instanceof Uncollect) {
-      return visitUncollect((Uncollect) n, inputStreams);
-    } else if (n instanceof Window) {
-      return visitWindow((Window) n, inputStreams);
-    } else if (n instanceof Join) {
-      return visitJoin((Join) n, inputStreams);
-    } else {
-      return defaultValue(n, inputStreams);
-    }
-  }
-
-  public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
-    return defaultValue(aggregate, inputStreams);
-  }
-
-  public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
-    return defaultValue(calc, inputStreams);
-  }
-
-  public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
-    return defaultValue(collect, inputStreams);
-  }
-
-  public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
-    return defaultValue(correlate, inputStreams);
-  }
-
-  public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
-    return defaultValue(delta, inputStreams);
-  }
-
-  public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
-    return defaultValue(exchange, inputStreams);
-  }
-
-  public T visitProject(Project project, List<T> inputStreams) throws Exception {
-    return defaultValue(project, inputStreams);
-  }
-
-  public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
-    return defaultValue(filter, inputStreams);
-  }
-
-  public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
-    return defaultValue(sample, inputStreams);
-  }
-
-  public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
-    return defaultValue(sort, inputStreams);
-  }
-
-  public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
-    return defaultValue(modify, inputStreams);
-  }
-
-  public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
-    return defaultValue(scan, inputStreams);
-  }
-
-  public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
-    return defaultValue(uncollect, inputStreams);
-  }
-
-  public T visitWindow(Window window, List<T> inputStreams) throws Exception {
-    return defaultValue(window, inputStreams);
-  }
-
-  public T visitJoin(Join join, List<T> inputStreams) throws Exception {
-    return defaultValue(join, inputStreams);
-  }
-
-  public T defaultValue(RelNode n, List<T> inputStreams) {
-    return null;
-  }
-}