You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:48 UTC

[6/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API

STORM-2406 [Storm SQL] Change underlying API to Streams API

* This will enable us to provide windowed aggregation, join, etc.
  * Tuple-to-tuple is making more sense than micro-batch in these cases
* Tested with several sql cases
* Also bump Calcite version to 1.14.0
  * Fix checkstyle issues a bit: not doing exhaustively
* Update diagrams in storm-sql internal doc
  * Also add XML type of exported diagrams as well to restore diagram from draw.io


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c69a23cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c69a23cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c69a23cf

Branch: refs/heads/master
Commit: c69a23cfe661afd7862201d94b45282e4450b9ee
Parents: c9e9a7c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 13 17:06:01 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 17 06:51:39 2018 +0900

----------------------------------------------------------------------
 ...ql-internal-example-exported-from-drawio.xml |   2 +
 docs/images/storm-sql-internal-example.png      | Bin 28377 -> 63265 bytes
 ...l-internal-workflow-exported-from-drawio.xml |   2 +
 docs/images/storm-sql-internal-workflow.png     | Bin 20020 -> 42408 bytes
 docs/storm-sql-internal.md                      |  16 +-
 docs/storm-sql-reference.md                     |  10 +-
 docs/storm-sql.md                               |  16 +-
 pom.xml                                         |   2 +-
 sql/README.md                                   | 183 +-----
 sql/storm-sql-core/pom.xml                      |  24 +
 .../storm/sql/AbstractStreamsProcessor.java     |  67 +++
 .../storm/sql/AbstractTridentProcessor.java     |  51 --
 .../org/apache/storm/sql/StormSqlContext.java   |  22 +-
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  12 +-
 .../sql/calcite/ParallelStreamableTable.java    |   2 +-
 .../apache/storm/sql/calcite/ParallelTable.java |  28 +
 .../storm/sql/calcite/StormStreamableTable.java |  26 +
 .../apache/storm/sql/calcite/StormTable.java    |  26 +
 .../apache/storm/sql/compiler/CompilerUtil.java |  85 ++-
 .../sql/compiler/RexNodeToJavaCodeCompiler.java |  22 +-
 .../storm/sql/javac/CompilingClassLoader.java   |  26 +-
 .../storm/sql/parser/ColumnConstraint.java      |   2 +-
 .../apache/storm/sql/parser/SqlCreateTable.java |  14 +-
 .../apache/storm/sql/parser/SqlDDLKeywords.java |  22 -
 .../apache/storm/sql/parser/SqlDdlKeywords.java |  22 +
 .../apache/storm/sql/parser/StormParser.java    |   3 +
 .../apache/storm/sql/planner/StormRelUtils.java |  12 +-
 .../sql/planner/rel/StormStreamScanRelBase.java |   2 -
 .../storm/sql/planner/streams/QueryPlanner.java | 158 ++++++
 .../sql/planner/streams/StreamsPlanCreator.java | 126 +++++
 .../planner/streams/StreamsStormRuleSets.java   | 110 ++++
 .../sql/planner/streams/rel/StreamsCalcRel.java |  96 ++++
 .../planner/streams/rel/StreamsFilterRel.java   |  66 +++
 .../streams/rel/StreamsLogicalConvention.java   |  69 +++
 .../planner/streams/rel/StreamsProjectRel.java  |  69 +++
 .../sql/planner/streams/rel/StreamsRel.java     |  27 +
 .../streams/rel/StreamsStreamInsertRel.java     |  81 +++
 .../streams/rel/StreamsStreamScanRel.java       |  58 ++
 .../streams/rules/StreamsAggregateRule.java     |  40 ++
 .../planner/streams/rules/StreamsCalcRule.java  |  46 ++
 .../streams/rules/StreamsFilterRule.java        |  47 ++
 .../planner/streams/rules/StreamsJoinRule.java  |  39 ++
 .../streams/rules/StreamsModifyRule.java        |  89 +++
 .../streams/rules/StreamsProjectRule.java       |  48 ++
 .../planner/streams/rules/StreamsScanRule.java  |  61 ++
 .../storm/sql/planner/trident/QueryPlanner.java | 156 ------
 .../sql/planner/trident/TridentPlanCreator.java | 117 ----
 .../planner/trident/TridentStormRuleSets.java   | 110 ----
 .../sql/planner/trident/rel/TridentCalcRel.java |  92 ---
 .../planner/trident/rel/TridentFilterRel.java   |  59 --
 .../trident/rel/TridentLogicalConvention.java   |  62 ---
 .../planner/trident/rel/TridentProjectRel.java  |  64 ---
 .../sql/planner/trident/rel/TridentRel.java     |  20 -
 .../trident/rel/TridentStreamInsertRel.java     |  72 ---
 .../trident/rel/TridentStreamScanRel.java       |  49 --
 .../trident/rules/TridentAggregateRule.java     |  33 --
 .../planner/trident/rules/TridentCalcRule.java  |  39 --
 .../trident/rules/TridentFilterRule.java        |  40 --
 .../planner/trident/rules/TridentJoinRule.java  |  32 --
 .../trident/rules/TridentModifyRule.java        |  65 ---
 .../trident/rules/TridentProjectRule.java       |  41 --
 .../planner/trident/rules/TridentScanRule.java  |  53 --
 .../test/org/apache/storm/sql/SqlTestUtil.java  |  19 +-
 .../storm/sql/StormSqlLocalClusterImpl.java     |   8 +-
 .../test/org/apache/storm/sql/TestStormSql.java |  84 +--
 .../backends/streams/TestCompilerUtils.java     | 217 ++++++++
 .../backends/streams/TestPlanCompiler.java      | 235 ++++++++
 .../backends/trident/TestCompilerUtils.java     | 207 -------
 .../backends/trident/TestExpressions.java       | 373 -------------
 .../backends/trident/TestPlanCompiler.java      | 231 --------
 .../storm/sql/hdfs/HdfsDataSourcesProvider.java |  75 +--
 .../sql/hdfs/TestHdfsDataSourcesProvider.java   |  59 +-
 .../sql/kafka/KafkaDataSourcesProvider.java     |  74 ++-
 .../sql/kafka/TestKafkaDataSourcesProvider.java |  64 +--
 .../sql/mongodb/MongoDataSourcesProvider.java   |  64 ++-
 .../mongodb/TestMongoDataSourcesProvider.java   |  94 +---
 .../sql/redis/RedisDataSourcesProvider.java     |  98 ++--
 .../sql/redis/TestRedisDataSourcesProvider.java | 108 +---
 sql/storm-sql-runtime/pom.xml                   |  32 +-
 .../storm/sql/runtime/DataSourcesProvider.java  |  13 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |   8 +-
 .../sql/runtime/ISqlStreamsDataSource.java      |  40 ++
 .../sql/runtime/ISqlTridentDataSource.java      |  67 ---
 .../sql/runtime/SimpleSqlTridentConsumer.java   |  42 --
 .../socket/SocketDataSourcesProvider.java       |  35 +-
 .../datasource/socket/bolt/SocketBolt.java      | 105 ++++
 .../datasource/socket/spout/SocketSpout.java    | 194 +++++++
 .../datasource/socket/trident/SocketState.java  |  89 ---
 .../socket/trident/SocketStateUpdater.java      |  59 --
 .../socket/trident/TridentSocketSpout.java      | 184 ------
 .../streams/functions/EvaluationCalc.java       |  85 +++
 .../streams/functions/EvaluationFilter.java     |  61 ++
 .../streams/functions/EvaluationFunction.java   |  64 +++
 .../StreamInsertMapToPairFunction.java          |  38 ++
 .../functions/StreamsScanTupleValueMapper.java  |  51 ++
 .../trident/functions/EvaluationCalc.java       |  95 ----
 .../trident/functions/EvaluationFilter.java     |  67 ---
 .../trident/functions/EvaluationFunction.java   |  74 ---
 .../trident/functions/ForwardFunction.java      |  31 --
 .../test/org/apache/storm/sql/TestUtils.java    | 558 +++++++------------
 .../socket/TestSocketDataSourceProvider.java    |  94 ----
 101 files changed, 3170 insertions(+), 3959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/images/storm-sql-internal-example-exported-from-drawio.xml
----------------------------------------------------------------------
diff --git a/docs/images/storm-sql-internal-example-exported-from-drawio.xml b/docs/images/storm-sql-internal-example-exported-from-drawio.xml
new file mode 100644
index 0000000..cd0247c
--- /dev/null
+++ b/docs/images/storm-sql-internal-example-exported-from-drawio.xml
@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="3e6f08aa-e2c0-d298-968c-8149d2042bca" name="Page-1">7Vtbb6M4FP41kXZXagThkvCYpulMpfQyIaPZPo0ccIinBrPgTJL59WuDnXLLlM02EKkgteBjc3z5Pp9zOHJ62sTffYpAuL4nLsS9geLuetpNbzAYmjr7zwX7VGAM1VTgRchNRRmBjX5BIVSEdINcGOcaUkIwRWFe6JAggA7NyUAUkW2+2YrgfK8h8GBJYDsAl6XfkEvXQqqa1mvFZ4i8teh6NBimFT6QjcVM4jVwyTYj0qY9bRIRQtMnfzeBmK+dXJf0vdsjtYeBRTCgdV7QNRMYrrEcuStXBZZ1JVX8BHgjZitGSvdy+ts1otAOgcPLW4ZwT7teUx+zksoeMVhCfA2cFy8im8CdEEyi5EVtlVysyYoE9Bb4CHMCTIiPHNaDDYKY3e5t0UDgrhqinFGkJBeTA4y8gMkcNl8Y8YYIY9kwIAFTcO2CeA1dMTheeAKUNQ4SyUDhUjFnGFG4O7qS6gEfxmtIfEijPWsiSa0ISAWnNVHcZghiCtk6ww1dNgSClN5B9Stu7EFAVxNGdfARYWT7nQIUcA03yvvAqusXBWsFqiamYmVz8Jr/bIisuIoTDMZ8mZRw91rJnjx+7/f7UtEyksKy5O7Bns4XTMvdw+KR3Wbj+afp98f5zXRuc+Sns+kkqWb9s8kpXx/uFt+f5neTKSv8xf6+fB0/LO4Wz+xxzN9YPC7GM3a/nT/es9tB07fP0/n0DQVsUMDnnBWDVQxFjpita7oexyZy
 yXuhBs/ldsFwRd+J5nmWHyidobmmVtBcVc5B87Lxsr/MEsAAhT6bUlwCkE2d5jGLaUReYMGIVNiVku3hC4mYsx+LCh+5Lu/muookCR8SuGpi1ySV3oEYh0hFMIP5ubIB1CqYMTgHMbS3vRqLqEL+uMJwN+axHlsHGLji8cbBII7ZguaokmXFGYGAbimsfBOGzDIbFassZRHEgKKfefVVSy96eCIo8RcCZc0soKxYfSOvJCabyIHivWxEWVClFwmjFohAQeRBWlKUcOEw8dPooR91jy76Wekdudm4EhaAu0dhBEoOUuqJQxBI2QIsMf8qCDJuJ1ufESfdH3E9eQtymiOS1u1/8vU2ud6Kx/gOGTnQcUpGltUsR4Zu8Il4EXARzKmHwFya5js5rAJjjQqzNKjYMOY5zJLRKO9uEU7bdqRrmXRqq6wzG2XdU0R+QIci0pm7C2Beu/Zu2LyfvScuWu076rVPPc3QZGzYCvmsEq48rrZFUeCThzoDK9wh+jcX9w1RepY1ARtZpooXn4WCH5DSvYATbChhIhLRNfFIAPCMkFB+SDT9wcDWIYnLfx8QpyH374KXMvq1vyhOT2YpHZK1kTRqIGm2hqTaIVkbSbMGksPWkOwyLOfJsJjFtMjpGRZz1F6GRdW7nV57p6t1jLba3l5vNm1hUxL5dkg29ONF8S6Ao1VlFG86I7hcVUfxQxbGw3fK4w8HF5QvU83OitS3IsM6VsRqzYo0mwz4qMnPy7Mg7SY/1VFnQuqbEKuGCcmcP2vahFiNmpCPnMm+PDPSbiQiO2o0BkbBS8e89pnXdiJbplYy3JsRj5884tPCnBUF3LtDTuc55KRbumTCXoJVtkpWU6ecBuUk3NN6H3fMaJ4Zw8LByCpeHM5Anp8YVQecuuzsfwlSq1Oqo7dSqnVzs5ZeUlRM854xOzs4fiQkjTJOPR++3CDswqgfwK1NIwj8P/r9/p+VR8bTM9nBMg6TsnJOUX+VZAMuZ
 DA+CC9kJJRcE0yPD6Y69CwYko9+JN4a5rdyZb6j6uTzmc7El/NsydcE/6wnIVs0b99FBQ1FBVbRXZhqve+IE6ICVnz9OWDqJF5/U6lN/wU=</diagram></mxfile>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/images/storm-sql-internal-example.png
----------------------------------------------------------------------
diff --git a/docs/images/storm-sql-internal-example.png b/docs/images/storm-sql-internal-example.png
index 74828d5..c02d47e 100644
Binary files a/docs/images/storm-sql-internal-example.png and b/docs/images/storm-sql-internal-example.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/images/storm-sql-internal-workflow-exported-from-drawio.xml
----------------------------------------------------------------------
diff --git a/docs/images/storm-sql-internal-workflow-exported-from-drawio.xml b/docs/images/storm-sql-internal-workflow-exported-from-drawio.xml
new file mode 100644
index 0000000..a583c93
--- /dev/null
+++ b/docs/images/storm-sql-internal-workflow-exported-from-drawio.xml
@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="641182a6-837e-6da3-d78c-7fb7ce4fa756" name="Page-1">7Vtbc6M2FP41fkyGO/gxcZJ2dnZn2roz7T7KIGO6MvKCHNv99ZVAwggExhds0tgPjjm6IOl837lIysicLLe/JGC1+IYDiEaGFmxH5svIMGzTpd9MsOOCsZcLwiQKcpG+F0yjfyEXaly6jgKYShUJxohEK1no4ziGPpFkIEnwRq42x0h+6wqEsCaY+gDVpX9FAVlwqe6M9wW/wihc8Fd7Bp/wEojKfCbpAgR4UxKZryNzkmBM8l/L7QQitnZiXfJ2bw2lxcASGJMuDQzPNCAMDH9Gv3VgPfAe3gFa88mODAfRvp7nmHZJR0x2fBmcn2ssCh7STElPtIJur7b7QvorZH+nv38VHc0SIUwJIHBJh5qKMjrS/D15hXoTvmzFIIwEr+MAsulotHiziAicroDPSjcUfFS2IEtEn3Q21AihCUY4ydqaAYDe3M8GkuAfsFTi+B6czWlJmIAgoiMslbkQOFDjEy/J37IPl0/5CEW9N7CMEIP7BC8jn05hCmI6a+3btJjUO0wI3DYqUi/gQWkF8RKSZEer8AYmB9RO6JU/b0r4FKBblKDpcBnglAiLnveooT84cDqCyOgJRE/TPxuBckFYzOd+YHgqWATueKZpaljM58DWhLq5zdLtLupvwtEFYGFYMi708S1xYfVlXAhOlmxdFrs0oma6bDY69pl1Vul2hUDcwTCd8YY7bNWwtcwhwdbpCbZfnv64hjkLbOgFlgoXnjEzHUeNi7Eb
 aK47MFw4gzJnXs/mzEfrlMDkjpGjMOJ5lVDIuSVGdFcBkoriYBw8sayEPvkIpNSFybqiE092f3O9Zg/f2cOjXSwYDGoJS2W5aIYEkhCStoitvqSlJbMVKyZkCUSARO/yEFTLyN/wG44yfggV27LGTIohqYsUrxMf8lblbKbSkaFVVV/pKF+DWkeZVotpn6Zo81DmlLNW78pmSgAiwyCB1HCAWZFWrNg0sonZzyP7hUoAisKYoYjqntoN85kRiQVET7xgGQUBa/+MwAyiZ+D/CDOrIcUD7HMBTmvZp43TPD3nUxoVSXEZvA2EamT/g/ZIGxkSDERSeypMRRU8n6fw4sDRVcCpgIFpaXXWOh5IE/SKX1XYTNtTWABT78Nojo+ymTGOGaQDkC4y76fLtOlgIRt0cnV7eKy9K1RyBfsmFP3plFLdUemspGsoRbXT8hmUUiWAsFv2sUoSHbnjxyvGCgdChVMThwlAPo36aTnC4X0j5ENshHRmlHqjV7/pRq/dE5B5BnwRaBG8oroJd3cADwPARXg6CASr0vNLIPgFEMBaZ/6ny/HWULD3oXaCjsVebV/5ptjTVXHAJ4reLM27UPR2KAzsMwRXHWp9JiVWtg6Oz5MaOrqqElWBzGdQ4vlKs9q132cWVd8myuLG7KpJRXn13VTZvXGNln0hF3XfU1X5WdkTd9x1Pegvqw72qF3Xc/ynXeWoUfeflgKmRh/us7hDVeIo5Zy4eYMTssAhjgF63UsrCikfsmwjkp2xZMcq7On7qHr8khcVBzD04R9IyI6rA6wJpqL9e79ivOpuErgoZ17TlPM6rUc5plqDnS3EyfowVZtRUiztFzjdh7ACsbWo1mgMmg9y+4OdlByOeU86KSldKGw6KbEs2XwLgA3zpKSgSFOmo6SzW+azNmo/Tr0yn80OfLZuxud6emL+/8jX0zHlgQM3Sj7NFS/YSa879zBdbtATE61W18tDpxN87ZX5Z3Xgn30z/tUzy
 /qy3/nXhtBW/nneWHZ+g74mYNjnUW44Ts/uQDrnZqSr7wTUF/5OujaMtpLOFFHFQFnmXsSxid97jjXxz7gBAZ0OBPRuRsD6bWvnTsCuBGy/HMecnuUMmn/elfnn3oB/7qD5V7/VfudfZ/41qK3gn+6O5aTv4Ux3KFyrIfdqVW8znUJX+rj/T9S8+v7fec3X/wA=</diagram></mxfile>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/images/storm-sql-internal-workflow.png
----------------------------------------------------------------------
diff --git a/docs/images/storm-sql-internal-workflow.png b/docs/images/storm-sql-internal-workflow.png
index 655c1c4..abcb518 100644
Binary files a/docs/images/storm-sql-internal-workflow.png and b/docs/images/storm-sql-internal-workflow.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/storm-sql-internal.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql-internal.md b/docs/storm-sql-internal.md
index c223d6b..14d1380 100644
--- a/docs/storm-sql-internal.md
+++ b/docs/storm-sql-internal.md
@@ -8,7 +8,7 @@ This page describes the design and the implementation of the Storm SQL integrati
 
 ## Overview
 
-SQL is a well-adopted yet complicated standard. Several projects including Drill, Hive, Phoenix and Spark have invested significantly in their SQL layers. One of the main design goal of StormSQL is to leverage the existing investments for these projects. StormSQL leverages [Apache Calcite](///calcite.apache.org) to implement the SQL standard. StormSQL focuses on compiling the SQL statements to Storm / Trident topologies so that they can be executed in Storm clusters.
+SQL is a well-adopted yet complicated standard. Several projects including Drill, Hive, Phoenix and Spark have invested significantly in their SQL layers. One of the main design goal of StormSQL is to leverage the existing investments for these projects. StormSQL leverages [Apache Calcite](///calcite.apache.org) to implement the SQL standard. StormSQL focuses on compiling the SQL statements to Storm topologies so that they can be executed in Storm clusters.
 
 Figure 1 describes the workflow of executing a SQL query in StormSQL. First, users provide a sequence of SQL statements. StormSQL parses the SQL statements and translates them to a Calcite logical plan. A logical plan consists of a sequence of SQL logical operators that describe how the query should be executed irrespective to the underlying execution engines. Some examples of logical operators include `TableScan`, `Filter`, `Projection` and `GroupBy`.
 
@@ -18,9 +18,11 @@ Figure 1 describes the workflow of executing a SQL query in StormSQL. First, use
 <p>Figure 1: Workflow of StormSQL.</p>
 </div>
 
-The next step is to compile the logical execution plan down to a physical execution plan. A physical plan consists of physical operators that describes how to execute the SQL query in *StormSQL*. Physical operators such as `Filter`, `Projection`, and `GroupBy` are directly mapped to operations in Trident topologies. StormSQL also compiles expressions in the SQL statements into Java code blocks and plugs them into the Trident functions which will be compiled once and executed in runtime.
+Note: Trident Topology is now replaced with normal Storm topology leveraging Streams API.
 
-Finally, StormSQL submits created Trident topology with empty packaged JAR to the Storm cluster. Storm schedules and executes the Trident topology in the same way of it executes other Storm topologies.
+The next step is to compile the logical execution plan down to a physical execution plan. A physical plan consists of physical operators that describes how to execute the SQL query in *StormSQL*. Physical operators such as `Filter`, `Projection`, and `GroupBy` are directly mapped to operations in Storm topologies. StormSQL also compiles expressions in the SQL statements into Java code blocks and plugs them into the Storm Streams API functions which will be compiled once and executed in runtime.
+
+Finally, StormSQL submits created Storm topology with empty packaged JAR to the Storm cluster. Storm schedules and executes the Storm topology in the same way of it executes other Storm topologies.
 
 The follow code blocks show an example query that filters and projects results from a Kafka stream.
 
@@ -32,12 +34,12 @@ CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kaf
 INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
 ```
 
-The first two SQL statements define the inputs and outputs of external data. Figure 2 describes the processes of how StormSQL takes the last `SELECT` query and compiles it down to Trident topology.
+The first two SQL statements define the inputs and outputs of external data. Figure 2 describes the processes of how StormSQL takes the last `SELECT` query and compiles it down to Storm topology.
 
 <div align="center">
-<img title="Compiling the example query to Trident topology" src="images/storm-sql-internal-example.png" style="max-width: 80rem"/>
+<img title="Compiling the example query to Storm topology" src="images/storm-sql-internal-example.png" style="max-width: 80rem"/>
 
-<p>Figure 2: Compiling the example query to Trident topology.</p>
+<p>Figure 2: Compiling the example query to Storm topology.</p>
 </div>
 
 
@@ -53,7 +55,7 @@ For more information please refer to http://calcite.apache.org/docs/stream.html.
 ## Dependency
 
 Storm takes care about necessary dependencies of Storm SQL except the data source JAR which is used by `EXTERNAL TABLE`. 
-You can use `--jars` or `--artifacts` option to `storm sql` so that data source JAR can be included to Storm SQL Runner and also Trident Topology runtime classpath.
+You can use `--jars` or `--artifacts` option to `storm sql` so that data source JAR can be included to Storm SQL Runner and also Storm Topology runtime classpath.
 (Use `--artifacts` if your data source JARs are available in Maven repository since it handles transitive dependencies.)
 
 Please refer [Storm SQL integration](storm-sql.html) page to how to do it.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/storm-sql-reference.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql-reference.md b/docs/storm-sql-reference.md
index 2b20a93..9b3ce34 100644
--- a/docs/storm-sql-reference.md
+++ b/docs/storm-sql-reference.md
@@ -13,7 +13,7 @@ Please read [Storm SQL integration](storm-sql.html) page first to see what featu
 
 ## Grammar
 
-Calcite provides broader SQL Grammar. But Storm SQL is not database system and handles streaming data, so only subset of grammar is supported.
+Calcite provides broader SQL Grammar. But Storm SQL is not a database system and handles streaming data, so only subset of grammar is supported.
 Storm SQL doesn't redefine SQL Grammar and just utilize the parser Calcite provided, so SQL statements are still parsed based on Calcite's SQL Grammar. 
 
 SQL grammar in [BNF](http://en.wikipedia.org/wiki/Backus%E2%80%93Naur_Form)-like form.
@@ -1247,7 +1247,7 @@ Please note that it supports only one letter for delimiter.
 
 Socket data source is a built-in feature so users don't need to add any artifacts to `--artifacts` options.
 
-Please note that Socket data source is only for testing: it doesn't guarantee exactly-once and at-least-once.
+Please note that Socket data source is only for testing: it doesn't ensure any delivery guarantee.
 
 TIP: `netcat` is a convenient tool for Socket: users can use netcat to connect Socket data source for either or both input and output purposes.
 
@@ -1281,9 +1281,9 @@ You can use below as working reference for `--artifacts` option, and change depe
 
 MongoDB data source requires below properties to be set:
 
-`{"collection.name": "storm_sql_mongo", "trident.ser.field": "serfield"}`
+`{"collection.name": "storm_sql_mongo", "ser.field": "serfield"}`
 
-* `trident.ser.field`: field to store - record will be serialized and stored as BSON in this field
+* `ser.field`: field to store - record will be serialized and stored as BSON in this field
 * `collection.name`: Collection name
 
 Please note that `storm-sql-mongodb` requires users to provide `storm-mongodb`.
@@ -1298,7 +1298,7 @@ Storing record with preserving fields are not supported for now.
 HDFS data source requires below properties to be set:
 
 * `hdfs.file.path`: HDFS file path
-* `hdfs.file.name`: HDFS file name - please refer to [SimpleFileNameFormat]({{page.git-blob-base}}/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java)
+* `hdfs.file.name`: HDFS file name - please refer to [SimpleFileNameFormat]({{page.git-blob-base}}/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java)
 * `hdfs.rotation.size.kb`: HDFS FileSizeRotationPolicy in KB
 * `hdfs.rotation.time.seconds`: HDFS TimedRotationPolicy in seconds
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index b06f068..7467e2d 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -6,14 +6,14 @@ documentation: true
 
 The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like [Apache Hive](///hive.apache.org) and real-time streaming data analytics.
 
-At a very high level StormSQL compiles the SQL queries to [Trident](Trident-API-Overview.html) topologies and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page.
+At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page.
 
 Storm SQL integration is an `experimental` feature, so the internal of Storm SQL and supported features are subject to change.
 But small change will not affect the user experience. We will notice/announce the user when breaking UX change is introduced.
 
 ## Usage
 
-Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster
+Run the ``storm sql`` command to compile SQL statements into Storm topology, and submit it to the Storm cluster
 
 ```
 $ bin/storm sql <sql-file> <topo-name>
@@ -33,7 +33,7 @@ The following features are supported in the current repository:
 * Projections
 * User defined function (scalar)
 
-Aggregations and Join are not supported by intention. When Storm SQL will support native `Streaming SQL`, these features will be introduced.    
+Aggregations and Join are not supported for now. When Storm SQL will support native `Streaming SQL`, these features will be introduced.    
 
 ## Specifying External Data Sources
 
@@ -53,8 +53,8 @@ CREATE EXTERNAL TABLE table_name field_list
 
 You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). 
 
-`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
-As same as Trident, downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Spout.
+Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
 
 Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
 
@@ -66,11 +66,11 @@ CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/
 
 ## Plugging in External Data Sources
 
-Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the tables. Please refer to the implementation of `storm-sql-kafka` for more details.
+Users plug in external data sources through implementing the `ISqlStreamsDataSource` interface and registers them using the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the tables. Please refer to the implementation of `storm-sql-kafka` for more details.
 
 ## Specifying User Defined Function (UDF)
 
-Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement.
+Users can define user defined function (scalar) using `CREATE FUNCTION` statement.
 For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class.
 
 ```
@@ -166,4 +166,4 @@ LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=
 ## Current Limitations
 
 - Windowing is yet to be implemented.
-- Aggregation and join are not supported (waiting for `Streaming SQL` to be matured)
+- Aggregation and join will be introduced after supporting windowing.

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c107c8..503924c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -306,7 +306,7 @@
         <cassandra.version>2.1.7</cassandra.version>
         <druid.version>0.8.2</druid.version>
         <elasticsearch.version>5.2.2</elasticsearch.version>
-        <calcite.version>1.11.0</calcite.version>
+        <calcite.version>1.14.0</calcite.version>
         <mongodb.version>3.2.0</mongodb.version>
         <solr.version>5.2.1</solr.version>
         <jpmml.version>1.0.22</jpmml.version>

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/README.md
----------------------------------------------------------------------
diff --git a/sql/README.md b/sql/README.md
index 32e0fcb..297c9de 100644
--- a/sql/README.md
+++ b/sql/README.md
@@ -1,187 +1,8 @@
 # Storm SQL
 
-Compile SQL queries to Storm topologies.
+Compile SQL queries to Storm topologies and run.
 
-## Usage
-
-Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster
-
-```
-$ bin/storm sql <sql-file> <topo-name>
-```
-
-In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology.
-
-StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`.
-Detailed explanation is available from `Showing Query Plan (explain mode)` section.
-
-## Supported Features
-
-The following features are supported in the current repository:
-
-* Streaming from and to external data sources
-* Filtering tuples
-* Projections
-* Aggregations (Grouping)
-* User defined function (scalar and aggregate)
-* Join (Inner, Left outer, Right outer, Full outer)
-
-## Specifying External Data Sources
-
-In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE`
-statement. For example, the following statement specifies a Kafka spouts and sink:
-
-```
-CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-```
-
-The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
-[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
-
-`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
-Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
-
-Default value is 1, and this option has no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
-
-## Plugging in External Data Sources
-
-Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using
-the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the
-tables. Please refer to the implementation of `storm-sql-kafka` for more details.
-
-## Specifying User Defined Function (UDF)
-
-Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement.
-For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class.
-
-```
-CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
-```
-
-Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined.
-If the class defines `evaluate` method, Storm SQL treats the function as `scalar`,
-and if the class defines `add` method, Storm SQL treats the function as `aggregate`.
-
-Example of class for scalar function is here:
-
-```
-  public class MyPlus {
-    public static Integer evaluate(Integer x, Integer y) {
-      return x + y;
-    }
-  }
-
-```
-
-and class for aggregate function is here:
-
-```
-  public class MyConcat {
-    public static String init() {
-      return "";
-    }
-    public static String add(String accumulator, String val) {
-      return accumulator + val;
-    }
-    public static String result(String accumulator) {
-      return accumulator;
-    }
-  }
-```
-
-If users don't define `result` method, result is the last return value of `add` method.
-Users need to define `result` method only when we need to transform accumulated value.
-
-## Example: Filtering Kafka Stream
-
-Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id
-of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the
-transactions are significant and to insert these orders into another Kafka stream for further analysis.
-
-The user can specify the following SQL statements in the SQL file:
-
-```
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-
-CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-
-INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
-```
-
-The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the
-ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`).
-The `TBLPROPERTIES` clause specifies the configuration of
-[KafkaProducer](http://kafka.apache.org/documentation.html#newproducerconfigs).
-Current implementation of `storm-sql-kafka` requires specifying both `LOCATION` and `TBLPROPERTIES` clauses even though
-the table is read-only or write-only.
-
-Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The third
-statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external
-table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by
-`LARGE_ORDER`.
-
-To run this example, users need to include the data sources (`storm-sql-kafka` in this case) and its dependency in the
-class path. Dependencies for Storm SQL are automatically handled when users run `storm sql`. Users can include data sources at the submission step like below:
-
-```
-$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
-```
-
-Above command submits the SQL statements to StormSQL. Users need to modify each artifacts' version if users are using different version of Storm or Kafka. 
-
-By now you should be able to see the `order_filtering` topology in the Storm UI.
-
-## Showing Query Plan (explain mode)
-
-Like `explain` on SQL statement, StormSQL provides `explain mode` when running Storm SQL Runner. In explain mode, StormSQL analyzes each query statement (only DML) and show plan instead of submitting topology.
-
-In order to run `explain mode`, you need to provide topology name as `--explain` and run `storm sql` as same as submitting.
-
-For example, when you run the example seen above with explain mode:
- 
-```
-$ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
-```
-
-StormSQL prints out like below:
- 
-```
-
-===========================================================
-query>
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
------------------------------------------------------------
-16:53:43.951 [main] INFO  o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@4d1bf319
-No plan presented on DDL
-===========================================================
-===========================================================
-query>
-CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
------------------------------------------------------------
-No plan presented on DDL
-===========================================================
-===========================================================
-query>
-INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
------------------------------------------------------------
-plan>
-LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
-  LogicalProject(ID=[$0], TOTAL=[*($1, $2)]), id = 7
-    LogicalFilter(condition=[>(*($1, $2), 50)]), id = 6
-      EnumerableTableScan(table=[[ORDERS]]), id = 5
-
-===========================================================
-
-```
-
-## Current Limitations
-
-- Windowing is yet to be implemented.
-- Only equi-join (single field equality) is supported for joining table.
-- Joining table only applies within each small batch that comes off of the spout.
-  - Not across batches.
-  - Limitation came from `join` feature of Trident.
-  - Please refer this doc: `Trident API Overview` for details.
+Please see [here](../docs/storm-sql.md) for details on how to use it.
 
 ## License
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/pom.xml b/sql/storm-sql-core/pom.xml
index 712f734..dfeb030 100644
--- a/sql/storm-sql-core/pom.xml
+++ b/sql/storm-sql-core/pom.xml
@@ -67,9 +67,33 @@
             <version>${calcite.version}</version>
             <exclusions>
                 <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica-metrics</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-annotations</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpcore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-dbcp</groupId>
+                    <artifactId>commons-dbcp</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractStreamsProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractStreamsProcessor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractStreamsProcessor.java
new file mode 100644
index 0000000..e061045
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractStreamsProcessor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql;
+
+import java.util.List;
+
+import org.apache.calcite.DataContext;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public abstract class AbstractStreamsProcessor {
+    protected Stream<Values> outputStream;
+    protected DataContext dataContext;
+    protected List<CompilingClassLoader> classLoaders;
+
+    /**
+     * Return final output stream of SQL topology structure.
+     *
+     * @return the output stream of the SQL
+     */
+    public Stream<Values> outputStream() {
+        return outputStream;
+    }
+
+    /**
+     * Construct the Storm topology based on the SQL.
+     */
+    public abstract StormTopology build();
+
+    /**
+     * Return DataContext instance which is used with execution of query.
+     *
+     * @return DataContext instance which is used with execution of query
+     */
+    public DataContext getDataContext() {
+        return dataContext;
+    }
+
+    /**
+     * Return the list of Classloaders which need to be compiled and included to the jar.
+     * They're all chaining so the last classloader can access all classes.
+     *
+     * @return Classloaders to compile.
+     */
+    public List<CompilingClassLoader> getClassLoaders() {
+        return classLoaders;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
deleted file mode 100644
index f648697..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/AbstractTridentProcessor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql;
-
-import java.util.List;
-import org.apache.calcite.DataContext;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-
-public abstract class AbstractTridentProcessor {
-    protected Stream outputStream;
-    protected DataContext dataContext;
-    protected List<CompilingClassLoader> classLoaders;
-
-    /**
-     * @return the output stream of the SQL
-     */
-    public Stream outputStream() {
-        return outputStream;
-    }
-
-    /**
-     * Construct the trident topology based on the SQL.
-     */
-    public abstract TridentTopology build();
-
-    /**
-     * @return DataContext instance which is used with execution of query
-     */
-    public DataContext getDataContext() {
-        return dataContext;
-    }
-
-    /**
-     * @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
-     */
-    public List<CompilingClassLoader> getClassLoaders() {
-        return classLoaders;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
index 4938af1..e5413c3 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlContext.java
@@ -25,7 +25,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.RelNode;
@@ -54,17 +57,17 @@ import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
+import org.apache.storm.sql.planner.streams.QueryPlanner;
 import org.apache.storm.sql.runtime.DataSourcesRegistry;
 import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
 
 public class StormSqlContext {
     private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
         RelDataTypeSystem.DEFAULT);
     private final SchemaPlus schema = Frameworks.createRootSchema(true);
     private boolean hasUdf = false;
-    private Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
+    private Map<String, ISqlStreamsDataSource> dataSources = new HashMap<>();
 
     public void interpretCreateTable(SqlCreateTable n) {
         CompilerUtil.TableBuilderInfo builder = new CompilerUtil.TableBuilderInfo(typeFactory);
@@ -84,7 +87,7 @@ public class StormSqlContext {
         Table table = builder.build();
         schema.add(n.tableName(), table);
 
-        ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
+        ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource(n.location(), n
             .inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
         if (ds == null) {
             throw new RuntimeException("Failed to find data source for " + n
@@ -113,7 +116,7 @@ public class StormSqlContext {
         hasUdf = true;
     }
 
-    public AbstractTridentProcessor compileSql(String query) throws Exception {
+    public AbstractStreamsProcessor compileSql(String query) throws Exception {
         QueryPlanner planner = new QueryPlanner(schema);
         return planner.compile(dataSources, query);
     }
@@ -133,8 +136,7 @@ public class StormSqlContext {
             List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
             sqlOperatorTables.add(SqlStdOperatorTable.instance());
             sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
-                                                           false,
-                                                           Collections.<String>emptyList(), typeFactory));
+                    Collections.emptyList(), typeFactory, new CalciteConnectionConfigImpl(new Properties())));
             return Frameworks.newConfigBuilder().defaultSchema(schema)
                              .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
         } else {
@@ -150,11 +152,7 @@ public class StormSqlContext {
         return schema;
     }
 
-    public boolean isHasUdf() {
-        return hasUdf;
-    }
-
-    public Map<String, ISqlTridentDataSource> getDataSources() {
+    public Map<String, ISqlStreamsDataSource> getDataSources() {
         return dataSources;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index b76d6b3..fff801d 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -26,12 +26,12 @@ import java.util.jar.Manifest;
 import java.util.zip.ZipEntry;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.trident.TridentTopology;
 
 class StormSqlImpl extends StormSql {
     private final StormSqlContext sqlContext;
@@ -53,19 +53,19 @@ class StormSqlImpl extends StormSql {
             } else if (node instanceof SqlCreateFunction) {
                 sqlContext.interpretCreateFunction((SqlCreateFunction) node);
             } else {
-                AbstractTridentProcessor processor = sqlContext.compileSql(sql);
-                TridentTopology topo = processor.build();
+                AbstractStreamsProcessor processor = sqlContext.compileSql(sql);
+                StormTopology topo = processor.build();
 
                 Path jarPath = null;
                 try {
-                    // QueryPlanner on Trident mode configures the topology with compiled classes,
+                    // QueryPlanner on Streams mode configures the topology with compiled classes,
                     // so we need to add new classes into topology jar
                     // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
 
                     jarPath = Files.createTempFile("storm-sql", ".jar");
                     System.setProperty("storm.jar", jarPath.toString());
                     packageTopology(jarPath, processor);
-                    StormSubmitter.submitTopologyAs(name, topoConf, topo.build(), opts, progressListener, asUser);
+                    StormSubmitter.submitTopologyAs(name, topoConf, topo, opts, progressListener, asUser);
                 } finally {
                     if (jarPath != null) {
                         Files.delete(jarPath);
@@ -102,7 +102,7 @@ class StormSqlImpl extends StormSql {
         }
     }
 
-    private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
+    private void packageTopology(Path jar, AbstractStreamsProcessor processor) throws IOException {
         Manifest manifest = new Manifest();
         Attributes attr = manifest.getMainAttributes();
         attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
index f80c992..3a6a916 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
@@ -20,7 +20,7 @@ import org.apache.calcite.schema.StreamableTable;
  *
  * @see Delta
  */
-public interface ParallelStreamableTable extends StreamableTable {
+public interface ParallelStreamableTable extends StormStreamableTable {
 
     /**
      * Returns parallelism hint of this table. Returns null if don't know.

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java
new file mode 100644
index 0000000..025077b
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.calcite;
+
+public interface ParallelTable extends StormTable {
+
+    /**
+     * Returns parallelism hint of this table. Returns null if not known.
+     */
+    Integer parallelismHint();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormStreamableTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormStreamableTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormStreamableTable.java
new file mode 100644
index 0000000..5833b1c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormStreamableTable.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.schema.StreamableTable;
+
+public interface StormStreamableTable extends StreamableTable {
+    int primaryKey();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormTable.java
new file mode 100644
index 0000000..26bb05f
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/StormTable.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.schema.Table;
+
+public interface StormTable extends Table {
+    int primaryKey();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1f7569a..3cb0f96 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -16,6 +16,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+
+import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.type.RelDataType;
@@ -25,33 +27,24 @@ import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.calcite.ParallelTable;
 import org.apache.storm.sql.parser.ColumnConstraint;
 
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
 import static org.apache.calcite.rel.RelFieldCollation.Direction;
 import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
 import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
 import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
-import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
 
 public class CompilerUtil {
-    public static String escapeJavaString(String s, boolean nullMeansNull) {
-        if (s == null) {
-            return nullMeansNull ? "null" : "\"\"";
-        } else {
-            String s1 = Util.replace(s, "\\", "\\\\");
-            String s2 = Util.replace(s1, "\"", "\\\"");
-            String s3 = Util.replace(s2, "\n\r", "\\n");
-            String s4 = Util.replace(s3, "\n", "\\n");
-            String s5 = Util.replace(s4, "\r", "\\r");
-            return "\"" + s5 + "\"";
-        }
-    }
 
     public static class TableBuilderInfo {
         private final RelDataTypeFactory typeFactory;
@@ -61,6 +54,7 @@ public class CompilerUtil {
         private Integer parallelismHint;
         private SqlMonotonicity primaryKeyMonotonicity;
         private Statistic stats;
+
         public TableBuilderInfo(RelDataTypeFactory typeFactory) {
             this.typeFactory = typeFactory;
         }
@@ -74,16 +68,31 @@ public class CompilerUtil {
             return this;
         }
 
+        public TableBuilderInfo field(String name, SqlTypeName type, ColumnConstraint constraint) {
+            interpretConstraint(constraint, fields.size());
+            return field(name, typeFactory.createSqlType(type));
+        }
+
+        public TableBuilderInfo field(String name, RelDataType type, ColumnConstraint constraint) {
+            interpretConstraint(constraint, fields.size());
+            fields.add(new FieldType(name, type));
+            return this;
+        }
+
         public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
             RelDataType dataType = type.deriveType(typeFactory);
+            interpretConstraint(constraint, fields.size());
+            fields.add(new FieldType(name, dataType));
+            return this;
+        }
+
+        private void interpretConstraint(ColumnConstraint constraint, int fieldIdx) {
             if (constraint instanceof ColumnConstraint.PrimaryKey) {
                 ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
                 Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
-                primaryKey = fields.size();
+                primaryKey = fieldIdx;
                 primaryKeyMonotonicity = pk.monotonicity();
             }
-            fields.add(new FieldType(name, dataType));
-            return this;
         }
 
         public TableBuilderInfo statistics(Statistic stats) {
@@ -104,10 +113,21 @@ public class CompilerUtil {
 
         public StreamableTable build() {
             final Statistic stat = buildStatistic();
-            final Table tbl = new Table() {
+
+            final Table tbl = new ParallelTable() {
+                @Override
+                public Integer parallelismHint() {
+                    return parallelismHint;
+                }
+
+                @Override
+                public int primaryKey() {
+                    return primaryKey;
+                }
+
                 @Override
                 public RelDataType getRowType(
-                    RelDataTypeFactory relDataTypeFactory) {
+                        RelDataTypeFactory relDataTypeFactory) {
                     RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
                     for (FieldType f : fields) {
                         b.add(f.name, f.relDataType);
@@ -118,17 +138,33 @@ public class CompilerUtil {
                 @Override
                 public Statistic getStatistic() {
                     return stat != null ? stat : Statistics.of(rows.size(),
-                                                               ImmutableList.<ImmutableBitSet>of());
+                            ImmutableList.<ImmutableBitSet>of());
                 }
 
                 @Override
                 public Schema.TableType getJdbcTableType() {
                     return Schema.TableType.STREAM;
                 }
+
+                @Override
+                public boolean isRolledUp(String column) {
+                    return false;
+                }
+
+                @Override
+                public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
+                                                            CalciteConnectionConfig config) {
+                    return false;
+                }
             };
 
             return new ParallelStreamableTable() {
                 @Override
+                public int primaryKey() {
+                    return primaryKey;
+                }
+
+                @Override
                 public Integer parallelismHint() {
                     return parallelismHint;
                 }
@@ -152,6 +188,17 @@ public class CompilerUtil {
                 public Schema.TableType getJdbcTableType() {
                     return Schema.TableType.STREAM;
                 }
+
+                @Override
+                public boolean isRolledUp(String column) {
+                    return false;
+                }
+
+                @Override
+                public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
+                                                            CalciteConnectionConfig config) {
+                    return false;
+                }
             };
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
index 6196daf..5c4cf64 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
@@ -52,7 +52,7 @@ import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
 
 /**
  * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
- *
+ * <p/>
  * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is
  * executable, we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
  */
@@ -67,15 +67,15 @@ public class RexNodeToJavaCodeCompiler {
      * Given a method that implements {@link ExecutableExpression#execute(Context, Object[])}, adds a bridge method that implements {@link
      * ExecutableExpression#execute(Context)}, and compiles.
      */
-    static String baz(ParameterExpression context_,
-                      ParameterExpression outputValues_, BlockStatement block, String className) {
+    static String baz(ParameterExpression context,
+                      ParameterExpression outputValues, BlockStatement block, String className) {
         final List<MemberDeclaration> declarations = Lists.newArrayList();
 
         // public void execute(Context, Object[] outputValues)
         declarations.add(
             Expressions.methodDecl(Modifier.PUBLIC, void.class,
                                    StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
-                                   ImmutableList.of(context_, outputValues_), block));
+                                   ImmutableList.of(context, outputValues), block));
 
         // public Object execute(Context)
         final BlockBuilder builder = new BlockBuilder();
@@ -86,14 +86,14 @@ public class RexNodeToJavaCodeCompiler {
             Expressions.statement(
                 Expressions.call(
                     Expressions.parameter(ExecutableExpression.class, "this"),
-                    StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
+                    StormBuiltInMethod.EXPR_EXECUTE2.method, context, values_)));
         builder.add(
             Expressions.return_(null,
                                 Expressions.arrayIndex(values_, Expressions.constant(0))));
         declarations.add(
             Expressions.methodDecl(Modifier.PUBLIC, Object.class,
                                    StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
-                                   ImmutableList.of(context_), builder.toBlock()));
+                                   ImmutableList.of(context), builder.toBlock()));
 
         final ClassDeclaration classDeclaration =
             Expressions.classDecl(Modifier.PUBLIC, className, null,
@@ -141,8 +141,8 @@ public class RexNodeToJavaCodeCompiler {
         return baz(context_, outputValues_, builder.toBlock(), className);
     }
 
-    private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
-                                        ParameterExpression outputValues_) {
+    private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context,
+                                        ParameterExpression outputValues) {
         RelDataType inputRowType = program.getInputRowType();
         final BlockBuilder builder = new BlockBuilder();
         final JavaTypeFactoryImpl javaTypeFactory =
@@ -152,7 +152,7 @@ public class RexNodeToJavaCodeCompiler {
             new RexToLixTranslator.InputGetterImpl(
                 ImmutableList.of(
                     Pair.<Expression, PhysType>of(
-                        Expressions.field(context_,
+                        Expressions.field(context,
                                           BuiltInMethod.CONTEXT_VALUES.field),
                         PhysTypeImpl.of(javaTypeFactory, inputRowType,
                                         JavaRowFormat.ARRAY, false))));
@@ -163,7 +163,7 @@ public class RexNodeToJavaCodeCompiler {
                 }
             };
         final Expression root =
-            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+            Expressions.field(context, BuiltInMethod.CONTEXT_ROOT.field);
         final List<Expression> list =
             RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
                                                  null, root, inputGetter, correlates);
@@ -171,7 +171,7 @@ public class RexNodeToJavaCodeCompiler {
             builder.add(
                 Expressions.statement(
                     Expressions.assign(
-                        Expressions.arrayIndex(outputValues_,
+                        Expressions.arrayIndex(outputValues,
                                                Expressions.constant(i)),
                         list.get(i))));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
index 66ad7c4..8e377d7 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -16,6 +16,7 @@
 
 package org.apache.storm.sql.javac;
 
+import static java.util.Collections.singleton;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,13 +36,11 @@ import javax.tools.JavaFileObject;
 import javax.tools.SimpleJavaFileObject;
 import javax.tools.ToolProvider;
 
-import static java.util.Collections.singleton;
-
 /**
  * This is a Java ClassLoader that will attempt to load a class from a string of source code.
- *
+ * <p/>
  * <h3>Example</h3>
- *
+ * <p/>
  * <pre>
  * String className = "com.foo.MyClass";
  * String classSource =
@@ -61,16 +60,16 @@ import static java.util.Collections.singleton;
  * Runnable instance = (Runnable)myClass.newInstance();
  * instance.run();
  * </pre>
- *
+ * <p/>
  * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to compile more, create multiple
  * CompilingClassLoader instances.
- *
+ * <p/>
  * Uses Java 1.6's in built compiler API.
- *
+ * <p/>
  * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the compile errors to System.err. If you don't
  * want the messages logged, or want to explicitly handle the messages you can provide your own {@link javax.tools.DiagnosticListener}
  * through {#setDiagnosticListener()}.
- *
+ * <p/>
  * @see java.lang.ClassLoader
  * @see javax.tools.JavaCompiler
  */
@@ -90,6 +89,8 @@ public class CompilingClassLoader extends ClassLoader {
     private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
 
     /**
+     * Constructor.
+     *
      * @param parent             Parent classloader to resolve dependencies from.
      * @param className          Name of class to compile. eg. "com.foo.MyClass".
      * @param sourceCode         Java source for class. e.g. "package com.foo; class MyClass { ... }".
@@ -161,7 +162,7 @@ public class CompilingClassLoader extends ClassLoader {
     public static class CompilerException extends Exception {
         private static final long serialVersionUID = -2936958840023603270L;
 
-        public CompilerException(String message) {
+        CompilerException(String message) {
             super(message);
         }
     }
@@ -169,7 +170,10 @@ public class CompilingClassLoader extends ClassLoader {
     private static class InMemoryJavaFile extends SimpleJavaFileObject {
         private final String sourceCode;
 
-        public InMemoryJavaFile(String className, String sourceCode) {
+        /**
+         * Constructor.
+         */
+        InMemoryJavaFile(String className, String sourceCode) {
             super(makeUri(className), Kind.SOURCE);
             this.sourceCode = sourceCode;
         }
@@ -196,7 +200,7 @@ public class CompilingClassLoader extends ClassLoader {
      * @see javax.tools.JavaFileManager
      */
     private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
-        public InMemoryFileManager(JavaFileManager fileManager) {
+        InMemoryFileManager(JavaFileManager fileManager) {
             super(fileManager);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
index 5811301..3be6bc9 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -27,7 +27,7 @@ public class ColumnConstraint extends SqlLiteral {
         private final SqlMonotonicity monotonicity;
 
         public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
-            super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+            super(SqlDdlKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
             this.monotonicity = monotonicity;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index d01284c..bd58dd6 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -14,6 +14,7 @@ package org.apache.storm.sql.parser;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
 
 public class SqlCreateTable extends SqlCall {
     public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
@@ -116,9 +118,9 @@ public class SqlCreateTable extends SqlCall {
     }
 
     public Integer parallelism() {
-        String parallelismStr = getString(parallelism);
-        if (parallelismStr != null) {
-            return Integer.parseInt(parallelismStr);
+        Integer para = getInteger(parallelism);
+        if (para != null) {
+            return para;
         } else {
             return DEFAULT_PARALLELISM;
         }
@@ -146,8 +148,12 @@ public class SqlCreateTable extends SqlCall {
         return props;
     }
 
+    private Integer getInteger(SqlNode n) {
+        return n == null ? null : ((BigDecimal) SqlLiteral.value(n)).intValueExact();
+    }
+
     private String getString(SqlNode n) {
-        return n == null ? null : SqlLiteral.stringValue(n);
+        return n == null ? null : ((NlsString) SqlLiteral.value(n)).getValue();
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
deleted file mode 100644
index 174d5fa..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-
-/**
- * Define the keywords that can occur in a CREATE TABLE statement
- */
-public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
-    PRIMARY
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDdlKeywords.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDdlKeywords.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDdlKeywords.java
new file mode 100644
index 0000000..327c55c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDdlKeywords.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement.
+ */
+public enum SqlDdlKeywords implements SqlLiteral.SqlSymbol {
+    PRIMARY
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
index 29ab7c5..e1d3a60 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -21,6 +21,9 @@ public class StormParser {
     public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
     private final StormParserImpl impl;
 
+    /**
+     * Constructor.
+     */
     public StormParser(String s) {
         this.impl = new StormParserImpl(new StringReader(s));
         this.impl.setTabSize(1);

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
index 03dc56c..b283a17 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
@@ -17,7 +17,7 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.apache.storm.sql.planner.streams.rel.StreamsRel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,21 +27,17 @@ public class StormRelUtils {
     private static final AtomicInteger sequence = new AtomicInteger(0);
     private static final AtomicInteger classSequence = new AtomicInteger(0);
 
-    public static String getStageName(TridentRel relNode) {
-        return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
-    }
-
-    public static String getClassName(TridentRel relNode) {
+    public static String getClassName(StreamsRel relNode) {
         return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
                classSequence.getAndIncrement();
     }
 
-    public static TridentRel getStormRelInput(RelNode input) {
+    public static StreamsRel getStormRelInput(RelNode input) {
         if (input instanceof RelSubset) {
             // go with known best input
             input = ((RelSubset) input).getBest();
         }
-        return (TridentRel) input;
+        return (StreamsRel) input;
     }
 
     public static String explain(final RelNode rel) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
index 3b952a0..756ed07 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
@@ -19,8 +19,6 @@ import org.apache.calcite.rel.core.TableScan;
 
 public abstract class StormStreamScanRelBase extends TableScan implements StormRelNode {
 
-    // FIXME: define Table class and table.unwrap() to get it
-
     protected StormStreamScanRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
         super(cluster, traitSet, table);
     }