You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/24 22:02:47 UTC

[2/3] storm git commit: STORM-1434 Support the GROUP BY clause in StormSQL

STORM-1434 Support the GROUP BY clause in StormSQL

* Support GROUP BY for Trident
* Implement basic functions for aggregation
* Change the way of converting Calcite logical plan to Trident logical plan
** before: creating codes and compile them
** after: use Trident features, only creating code block if evaluation is needed
*** Janino comes in to help evaluating code block in runtime
* expand PostOrderRelNodeVisitor to be generalized version of TridentPostOrderRelNodeVisitor
* Change the way to join results of aggregation functions via chaining
** Trident handles chained aggregator via applying each tuple to all aggregators, get rid of multiple reading for input tuples
* Remove the empty check for tuple
** Crash fast if input fields for aggregation / function is not set rather than giving wrong result
* add comment why we package empty topology and how sql topology works
* Add test for scalar UDF with Trident


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0226dd4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0226dd4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0226dd4f

Branch: refs/heads/1.x-branch
Commit: 0226dd4f985d6bb5266965a04fec0787763e2ac4
Parents: a3090d3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 18 16:32:46 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 25 07:02:09 2016 +0900

----------------------------------------------------------------------
 external/sql/README.md                          |   6 +-
 external/sql/storm-sql-core/pom.xml             |   5 +
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  19 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  16 +-
 .../sql/compiler/PostOrderRelNodeVisitor.java   | 100 ++---
 .../backends/standalone/RelNodeCompiler.java    |  14 +-
 .../compiler/backends/trident/PlanCompiler.java | 222 +++---------
 .../backends/trident/RelNodeCompiler.java       | 116 ------
 .../trident/TridentLogicalPlanCompiler.java     | 363 +++++++++++++++++++
 .../storm/sql/compiler/TestCompilerUtils.java   |  32 +-
 .../storm/sql/compiler/TestExprCompiler.java    |   2 +-
 .../standalone/TestRelNodeCompiler.java         |   7 +-
 .../backends/trident/TestPlanCompiler.java      |  62 +++-
 external/sql/storm-sql-runtime/pom.xml          |   5 +
 .../sql/runtime/trident/NumberConverter.java    |  47 +++
 .../storm/sql/runtime/trident/TridentUtils.java |  35 ++
 .../trident/functions/EvaluationFilter.java     |  62 ++++
 .../trident/functions/EvaluationFunction.java   |  64 ++++
 .../trident/functions/ForwardFunction.java      |  30 ++
 .../sql/runtime/trident/operations/CountBy.java |  53 +++
 .../trident/operations/DivideForAverage.java    |  45 +++
 .../sql/runtime/trident/operations/MaxBy.java   |  68 ++++
 .../sql/runtime/trident/operations/MinBy.java   |  68 ++++
 .../sql/runtime/trident/operations/SumBy.java   |  60 +++
 .../test/org/apache/storm/sql/TestUtils.java    |  74 ++++
 pom.xml                                         |   7 +
 26 files changed, 1211 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index 2ac44a5..b22a79c 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -19,6 +19,7 @@ The following features are supported in the current repository:
 * Streaming from and to external data sources
 * Filtering tuples
 * Projections
+* Aggregations (Grouping)
 
 ## Specifying External Data Sources
 
@@ -79,8 +80,7 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
 
 ## Current Limitations
 
-Aggregation, windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not
-yet supported.
+Windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. Supported aggregation functions are 'SUM', 'AVG', 'COUNT', 'MIN', 'MAX'. (Planned to address UDF - user defined functions.)
 
 Users also need to provide the dependency of the external data sources in the `extlib` directory. Otherwise the topology
 will fail to run because of `ClassNotFoundException`.
@@ -106,4 +106,4 @@ under the License.
 
 ## Committer Sponsors
  * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index d76a253..2e52be2 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -86,6 +86,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- janino -->
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index dbe0ddd..e404999 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -119,14 +119,17 @@ class StormSqlImpl extends StormSql {
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
         org.apache.storm.sql.compiler.backends.trident.PlanCompiler compiler =
-            new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(typeFactory);
-        AbstractTridentProcessor proc = compiler.compile(tree);
-        TridentTopology topo = proc.build(dataSources);
+                new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, typeFactory);
+        TridentTopology topo = compiler.compile(tree);
         Path jarPath = null;
         try {
+          // PlanCompiler configures the topology without any new classes, so we don't need to add anything into topology jar
+          // packaging empty jar since topology jar is needed for topology submission
+          // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
+
           jarPath = Files.createTempFile("storm-sql", ".jar");
           System.setProperty("storm.jar", jarPath.toString());
-          packageTopology(jarPath, compiler.getCompilingClassLoader(), proc);
+          packageEmptyTopology(jarPath);
           StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
         } finally {
           if (jarPath != null) {
@@ -137,18 +140,12 @@ class StormSqlImpl extends StormSql {
     }
   }
 
-  private void packageTopology(Path jar, CompilingClassLoader cl, AbstractTridentProcessor processor) throws IOException {
+  private void packageEmptyTopology(Path jar) throws IOException {
     Manifest manifest = new Manifest();
     Attributes attr = manifest.getMainAttributes();
     attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
-    attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
     try (JarOutputStream out = new JarOutputStream(
         new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
-      for (Map.Entry<String, ByteArrayOutputStream> e : cl.getClasses().entrySet()) {
-        out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
-        out.write(e.getValue().toByteArray());
-        out.closeEntry();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 0977acc..4e1c127 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Primitives;
 import org.apache.calcite.adapter.enumerable.NullPolicy;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.tree.Primitive;
@@ -67,8 +68,9 @@ public class ExprCompiler implements RexVisitor<String> {
   public String visitInputRef(RexInputRef rexInputRef) {
     String name = reserveName();
     String typeName = javaTypeName(rexInputRef);
+    String boxedTypeName = boxedJavaTypeName(rexInputRef);
     pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
-                           typeName, rexInputRef.getIndex()));
+                           boxedTypeName, rexInputRef.getIndex()));
     return name;
   }
 
@@ -149,6 +151,16 @@ public class ExprCompiler implements RexVisitor<String> {
     return ((Class<?>)ty).getCanonicalName();
   }
 
+  private String boxedJavaTypeName(RexNode node) {
+    Type ty = typeFactory.getJavaClass(node.getType());
+    Class clazz = (Class<?>)ty;
+    if (clazz.isPrimitive()) {
+      clazz = Primitives.wrap(clazz);
+    }
+
+    return clazz.getCanonicalName();
+  }
+
   private String reserveName() {
     return "t" + ++nameCount;
   }
@@ -479,7 +491,7 @@ public class ExprCompiler implements RexVisitor<String> {
         RexNode op = call.getOperands().get(0);
         String lhs = op.accept(compiler);
         pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n",
-                               compiler.javaTypeName(call), val, lhs));
+                               compiler.boxedJavaTypeName(call), val, lhs));
         return val;
       }
     };

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
index 9dcfd29..2077ea9 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
@@ -21,108 +21,112 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.*;
 import org.apache.calcite.rel.stream.Delta;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public abstract class PostOrderRelNodeVisitor<T> {
   public final T traverse(RelNode n) throws Exception {
+    List<T> inputStreams = new ArrayList<>();
     for (RelNode input : n.getInputs()) {
-      traverse(input);
+      inputStreams.add(traverse(input));
     }
 
     if (n instanceof Aggregate) {
-      return visitAggregate((Aggregate) n);
+      return visitAggregate((Aggregate) n, inputStreams);
     } else if (n instanceof Calc) {
-      return visitCalc((Calc) n);
+      return visitCalc((Calc) n, inputStreams);
     } else if (n instanceof Collect) {
-      return visitCollect((Collect) n);
+      return visitCollect((Collect) n, inputStreams);
     } else if (n instanceof Correlate) {
-      return visitCorrelate((Correlate) n);
+      return visitCorrelate((Correlate) n, inputStreams);
     } else if (n instanceof Delta) {
-      return visitDelta((Delta) n);
+      return visitDelta((Delta) n, inputStreams);
     } else if (n instanceof Exchange) {
-      return visitExchange((Exchange) n);
+      return visitExchange((Exchange) n, inputStreams);
     } else if (n instanceof Project) {
-      return visitProject((Project) n);
+      return visitProject((Project) n, inputStreams);
     } else if (n instanceof Filter) {
-      return visitFilter((Filter) n);
+      return visitFilter((Filter) n, inputStreams);
     } else if (n instanceof Sample) {
-      return visitSample((Sample) n);
+      return visitSample((Sample) n, inputStreams);
     } else if (n instanceof Sort) {
-      return visitSort((Sort) n);
+      return visitSort((Sort) n, inputStreams);
     } else if (n instanceof TableModify) {
-      return visitTableModify((TableModify) n);
+      return visitTableModify((TableModify) n, inputStreams);
     } else if (n instanceof TableScan) {
-      return visitTableScan((TableScan) n);
+      return visitTableScan((TableScan) n, inputStreams);
     } else if (n instanceof Uncollect) {
-      return visitUncollect((Uncollect) n);
+      return visitUncollect((Uncollect) n, inputStreams);
     } else if (n instanceof Window) {
-      return visitWindow((Window) n);
+      return visitWindow((Window) n, inputStreams);
     } else if (n instanceof Join) {
-      return visitJoin((Join) n);
+      return visitJoin((Join) n, inputStreams);
     } else {
-      return defaultValue(n);
+      return defaultValue(n, inputStreams);
     }
   }
 
-  public T visitAggregate(Aggregate aggregate) throws Exception {
-    return defaultValue(aggregate);
+  public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
+    return defaultValue(aggregate, inputStreams);
   }
 
-  public T visitCalc(Calc calc) throws Exception {
-    return defaultValue(calc);
+  public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
+    return defaultValue(calc, inputStreams);
   }
 
-  public T visitCollect(Collect collect) throws Exception {
-    return defaultValue(collect);
+  public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
+    return defaultValue(collect, inputStreams);
   }
 
-  public T visitCorrelate(Correlate correlate) throws Exception {
-    return defaultValue(correlate);
+  public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
+    return defaultValue(correlate, inputStreams);
   }
 
-  public T visitDelta(Delta delta) throws Exception {
-    return defaultValue(delta);
+  public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
+    return defaultValue(delta, inputStreams);
   }
 
-  public T visitExchange(Exchange exchange) throws Exception {
-    return defaultValue(exchange);
+  public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
+    return defaultValue(exchange, inputStreams);
   }
 
-  public T visitProject(Project project) throws Exception {
-    return defaultValue(project);
+  public T visitProject(Project project, List<T> inputStreams) throws Exception {
+    return defaultValue(project, inputStreams);
   }
 
-  public T visitFilter(Filter filter) throws Exception {
-    return defaultValue(filter);
+  public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
+    return defaultValue(filter, inputStreams);
   }
 
-  public T visitSample(Sample sample) throws Exception {
-    return defaultValue(sample);
+  public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
+    return defaultValue(sample, inputStreams);
   }
 
-  public T visitSort(Sort sort) throws Exception {
-    return defaultValue(sort);
+  public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
+    return defaultValue(sort, inputStreams);
   }
 
-  public T visitTableModify(TableModify modify) throws Exception {
-    return defaultValue(modify);
+  public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
+    return defaultValue(modify, inputStreams);
   }
 
-  public T visitTableScan(TableScan scan) throws Exception {
-    return defaultValue(scan);
+  public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
+    return defaultValue(scan, inputStreams);
   }
 
-  public T visitUncollect(Uncollect uncollect) throws Exception {
-    return defaultValue(uncollect);
+  public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
+    return defaultValue(uncollect, inputStreams);
   }
 
-  public T visitWindow(Window window) throws Exception {
-    return defaultValue(window);
+  public T visitWindow(Window window, List<T> inputStreams) throws Exception {
+    return defaultValue(window, inputStreams);
   }
 
-  public T visitJoin(Join join) throws Exception {
-    return defaultValue(join);
+  public T visitJoin(Join join, List<T> inputStreams) throws Exception {
+    return defaultValue(join, inputStreams);
   }
 
-  public T defaultValue(RelNode n) {
+  public T defaultValue(RelNode n, List<T> inputStreams) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 3ede7e9..9df0496 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -196,13 +196,13 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitDelta(Delta delta) throws Exception {
+  public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
     pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
     return null;
   }
 
   @Override
-  public Void visitFilter(Filter filter) throws Exception {
+  public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
     beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     String r = filter.getCondition().accept(compiler);
@@ -216,7 +216,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitProject(Project project) throws Exception {
+  public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
     beginStage(project);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     int size = project.getChildExps().size();
@@ -232,18 +232,18 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void defaultValue(RelNode n) {
+  public Void defaultValue(RelNode n, List<Void> inputStreams) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public Void visitTableScan(TableScan scan) throws Exception {
+  public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
     pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
     return null;
   }
 
   @Override
-  public Void visitAggregate(Aggregate aggregate) throws Exception {
+  public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
     beginAggregateStage(aggregate);
     pw.println("        List<Object> curGroupValues = _data == null ? null : getGroupValues(_data);");
     pw.println("        if (prevGroupValues != null && !prevGroupValues.equals(curGroupValues)) {");
@@ -262,7 +262,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitJoin(Join join) {
+  public Void visitJoin(Join join, List<Void> inputStreams) {
     beginJoinStage(join);
     pw.println("        if (source == left) {");
     pw.println("            leftRows.add(_data);");

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
index 7a5516d..41777e5 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -1,201 +1,71 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  * <p>
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  * <p>
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.storm.sql.compiler.backends.trident;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
-import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Map;
 
 public class PlanCompiler {
-  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
-  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
-  private static final String PROLOGUE = NEW_LINE_JOINER.join(
-      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
-      "import java.util.List;",
-      "import java.util.Map;",
-      "import org.apache.storm.tuple.Fields;",
-      "import org.apache.storm.tuple.Values;",
-      "import org.apache.storm.sql.runtime.ISqlTridentDataSource;",
-      "import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;",
-      "import org.apache.storm.trident.Stream;",
-      "import org.apache.storm.trident.TridentTopology;",
-      "import org.apache.storm.trident.fluent.IAggregatableStream;",
-      "import org.apache.storm.trident.operation.TridentCollector;",
-      "import org.apache.storm.trident.operation.BaseFunction;",
-      "import org.apache.storm.trident.spout.IBatchSpout;",
-      "import org.apache.storm.trident.tuple.TridentTuple;",
-      "",
-      "public final class TridentProcessor extends AbstractTridentProcessor {",
-      "");
-  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
-      "  @Override",
-      "  public TridentTopology build(Map<String, ISqlTridentDataSource> _sources) {",
-      "    TridentTopology topo = new TridentTopology();",
-      ""
-  );
+    private Map<String, ISqlTridentDataSource> sources;
+    private final JavaTypeFactory typeFactory;
 
-  private final JavaTypeFactory typeFactory;
-  private CompilingClassLoader compilingClassLoader;
-  public PlanCompiler(JavaTypeFactory typeFactory) {
-    this.typeFactory = typeFactory;
-  }
-
-  private String generateJavaSource(RelNode root) throws Exception {
-    StringWriter sw = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(sw)) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      printPrologue(pw);
-      compiler.traverse(root);
-      printMain(pw, root);
-      printEpilogue(pw);
+    public PlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory) {
+        this.sources = sources;
+        this.typeFactory = typeFactory;
     }
-    return sw.toString();
-  }
-
-  private static class MainFuncCompiler extends PostOrderRelNodeVisitor<Void> {
-    private final PrintWriter pw;
-    private static final String TABLESCAN_TMPL = NEW_LINE_JOINER.join(
-        "if (!_sources.containsKey(%2$s))",
-        "    throw new RuntimeException(\"Cannot find table \" + %2$s);",
-        "Stream _%1$s = topo.newStream(%2$s, _sources.get(%2$s).getProducer());",
-        ""
-    );
 
-    private static final String TABLEMODIFY_TMPL = NEW_LINE_JOINER.join(
-        "Stream _%1$s = _%3$s.each(new Fields(%4$s), _sources.get(%2$s).getConsumer(), new Fields(%5$s));",
-        ""
-    );
-    private static final String TRANSFORMATION_TMPL = NEW_LINE_JOINER.join(
-        "Stream _%1$s = _%2$s.each(new Fields(%3$s), %1$s, new Fields(%4$s)).toStream().project(new Fields(%4$s));",
-        ""
-    );
+    public AbstractTridentProcessor compileForTest(RelNode plan) throws Exception {
+        final TridentTopology topology = new TridentTopology();
 
-    private MainFuncCompiler(PrintWriter pw) {
-      this.pw = pw;
-    }
+        TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        final IAggregatableStream stream = compiler.traverse(plan);
 
-    @Override
-    public Void defaultValue(RelNode n) {
-      throw new UnsupportedOperationException();
-    }
+        return new AbstractTridentProcessor() {
+            @Override
+            public Stream outputStream() {
+                return stream.toStream();
+            }
 
-    @Override
-    public Void visitFilter(Filter filter) throws Exception {
-      visitTransformation(filter);
-      return null;
+            @Override
+            public TridentTopology build(Map<String, ISqlTridentDataSource> sources) {
+                return topology;
+            }
+        };
     }
 
-    @Override
-    public Void visitTableModify(TableModify modify) throws Exception {
-      Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
-      String name = RelNodeCompiler.getStageName(modify);
-      RelNode input = modify.getInput();
-      String inputName = RelNodeCompiler.getStageName(input);
-      pw.print(String.format(TABLEMODIFY_TMPL, name, CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(modify.getTable().getQualifiedName()), true),
-          inputName, getFieldString(input), getFieldString(modify)));
-      return null;
-    }
+    public TridentTopology compile(RelNode plan) throws Exception {
+        TridentTopology topology = new TridentTopology();
 
-    @Override
-    public Void visitTableScan(TableScan scan) throws Exception {
-      String name = RelNodeCompiler.getStageName(scan);
-      pw.print(String.format(TABLESCAN_TMPL, name, CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(scan.getTable().getQualifiedName()), true)));
-      return null;
-    }
+        TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        compiler.traverse(plan);
 
-    @Override
-    public Void visitProject(Project project) throws Exception {
-      visitTransformation(project);
-      return null;
+        return topology;
     }
 
-    private static String getFieldString(RelNode n) {
-      int id = n.getId();
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-      for (String f: n.getRowType().getFieldNames()) {
-        if (!first) {
-          sb.append(", ");
-        }
-        if (n instanceof TableScan) {
-          sb.append(CompilerUtil.escapeJavaString(f, true));
-        } else {
-          sb.append(CompilerUtil.escapeJavaString(String.format("%d_%s", id, f), true));
-        }
-        first = false;
-      }
-      return sb.toString();
-    }
-
-    private void visitTransformation(SingleRel node) {
-      String name = RelNodeCompiler.getStageName(node);
-      RelNode input = node.getInput();
-      String inputName = RelNodeCompiler.getStageName(input);
-      pw.print(String.format(TRANSFORMATION_TMPL, name, inputName,
-          getFieldString(input), getFieldString(node)));
-    }
-  }
-
-  private void printMain(PrintWriter pw, RelNode root) throws Exception {
-    pw.print(INITIALIZER_PROLOGUE);
-    MainFuncCompiler compiler = new MainFuncCompiler(pw);
-    compiler.traverse(root);
-    pw.print(String.format("  this.outputStream = _%s;\n", RelNodeCompiler.getStageName(root)));
-    pw.print("  return topo; \n}\n");
-  }
-
-  public AbstractTridentProcessor compile(RelNode plan) throws Exception {
-    String javaCode = generateJavaSource(plan);
-    compilingClassLoader = new CompilingClassLoader(getClass().getClassLoader(),
-        PACKAGE_NAME + ".TridentProcessor",
-        javaCode, null);
-    return (AbstractTridentProcessor) compilingClassLoader.loadClass(PACKAGE_NAME + ".TridentProcessor").newInstance();
-  }
 
-  public CompilingClassLoader getCompilingClassLoader() {
-    return compilingClassLoader;
-  }
 
-  private static void printEpilogue(
-      PrintWriter pw) throws Exception {
-    pw.print("}\n");
-  }
 
-  private static void printPrologue(PrintWriter pw) {
-    pw.append(PROLOGUE);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
deleted file mode 100644
index 340b9a2..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  * <p>
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  * <p>
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.storm.sql.compiler.backends.trident;
-
-import org.apache.storm.tuple.Fields;
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.storm.sql.compiler.ExprCompiler;
-import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
-
-import java.io.PrintWriter;
-import java.util.IdentityHashMap;
-import java.util.Map;
-
-/**
- * Compile RelNodes into individual functions.
- */
-class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
-  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
-
-  private final PrintWriter pw;
-  private final JavaTypeFactory typeFactory;
-  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-    "  private static final BaseFunction %1$s = ",
-    "    new BaseFunction() {",
-    "    @Override",
-    "    public void execute(TridentTuple tuple, TridentCollector collector) {",
-    "      List<Object> _data = tuple.getValues();",
-    ""
-  );
-
-  private final IdentityHashMap<RelNode, Fields> outputFields = new IdentityHashMap<>();
-
-  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
-    this.pw = pw;
-    this.typeFactory = typeFactory;
-  }
-
-  @Override
-  public Void visitFilter(Filter filter) throws Exception {
-    beginStage(filter);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-    String r = filter.getCondition().accept(compiler);
-    pw.print(String.format("    if (%s) { collector.emit(_data); }\n", r));
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitProject(Project project) throws Exception {
-    beginStage(project);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-
-    int size = project.getChildExps().size();
-    String[] res = new String[size];
-    for (int i = 0; i < size; ++i) {
-      res[i] = project.getChildExps().get(i).accept(compiler);
-    }
-
-    pw.print(String.format("    collector.emit(new Values(%s));\n",
-                           Joiner.on(',').join(res)));
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void defaultValue(RelNode n) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Void visitTableScan(TableScan scan) throws Exception {
-    return null;
-  }
-
-  @Override
-  public Void visitTableModify(TableModify modify) throws Exception {
-    return null;
-  }
-
-  private void beginStage(RelNode n) {
-    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
-  }
-
-  private void endStage() {
-    pw.print("  }\n  };\n");
-  }
-
-  static String getStageName(RelNode n) {
-    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
new file mode 100644
index 0000000..4886ffc
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
+import org.apache.storm.sql.runtime.trident.functions.ForwardFunction;
+import org.apache.storm.sql.runtime.trident.operations.CountBy;
+import org.apache.storm.sql.runtime.trident.operations.DivideForAverage;
+import org.apache.storm.sql.runtime.trident.operations.MaxBy;
+import org.apache.storm.sql.runtime.trident.operations.MinBy;
+import org.apache.storm.sql.runtime.trident.operations.SumBy;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
+import org.apache.storm.trident.fluent.GroupedStream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.tuple.Fields;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class TransformInformation {
+    private Fields inputFields;
+    private Function function;
+    private Fields functionFields;
+
+    public TransformInformation(Fields inputFields, Function function, Fields functionFields) {
+        this.inputFields = inputFields;
+        this.function = function;
+        this.functionFields = functionFields;
+    }
+
+    public Fields getInputFields() {
+        return inputFields;
+    }
+
+    public Function getFunction() {
+        return function;
+    }
+
+    public Fields getFunctionFields() {
+        return functionFields;
+    }
+}
+
+public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggregatableStream> {
+    protected final Map<String, ISqlTridentDataSource> sources;
+    protected final JavaTypeFactory typeFactory;
+    protected TridentTopology topology;
+
+    public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory, TridentTopology topology) {
+        this.sources = sources;
+        this.typeFactory = typeFactory;
+        this.topology = topology;
+    }
+
+    @Override
+    public IAggregatableStream defaultValue(RelNode n, List<IAggregatableStream> inputStreams) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IAggregatableStream visitTableScan(TableScan scan, List<IAggregatableStream> inputStreams) throws Exception {
+        String sourceName = Joiner.on('.').join(scan.getTable().getQualifiedName());
+        if (!sources.containsKey(sourceName)) {
+            throw new RuntimeException("Cannot find table " + sourceName);
+        }
+
+        String stageName = getStageName(scan);
+        return topology.newStream(stageName, sources.get(sourceName).getProducer());
+    }
+
+    @Override
+    public IAggregatableStream visitTableModify(TableModify modify, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("TableModify is a SingleRel");
+        }
+
+        Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
+        RelNode input = modify.getInput();
+        String tableName = Joiner.on('.').join(modify.getTable().getQualifiedName());
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(modify);
+
+        List<String> inputFields = input.getRowType().getFieldNames();
+        List<String> outputFields = modify.getRowType().getFieldNames();
+
+        return inputStream.each(new Fields(inputFields), sources.get(tableName).getConsumer(), new Fields(outputFields))
+                .name(stageName);
+    }
+
+    @Override
+    public IAggregatableStream visitProject(Project project, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Project is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        Fields inputFields = inputStream.getOutputFields();
+        String stageName = getStageName(project);
+
+        // Trident doesn't allow duplicated field name... need to do the trick...
+        List<String> outputFieldNames = project.getRowType().getFieldNames();
+        List<String> temporaryOutputFieldNames = new ArrayList<>();
+        for (String outputFieldName : outputFieldNames) {
+            temporaryOutputFieldNames.add("__" + outputFieldName + "__");
+        }
+
+        try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
+            pw.write("import org.apache.storm.tuple.Values;\n");
+
+            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+            int size = project.getChildExps().size();
+            String[] res = new String[size];
+            for (int i = 0; i < size; ++i) {
+                res[i] = project.getChildExps().get(i).accept(compiler);
+            }
+
+            pw.write(String.format("\nreturn new Values(%s);", Joiner.on(',').join(res)));
+            final String expression = sw.toString();
+
+            return inputStream.each(inputFields, new EvaluationFunction(expression), new Fields(temporaryOutputFieldNames))
+                    .project(new Fields(temporaryOutputFieldNames))
+                    .each(new Fields(temporaryOutputFieldNames), new ForwardFunction(), new Fields(outputFieldNames))
+                    .project(new Fields(outputFieldNames))
+                    .name(stageName);
+        }
+    }
+
+    @Override
+    public IAggregatableStream visitFilter(Filter filter, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Filter is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(filter);
+
+        try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
+            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+            String ret = filter.getCondition().accept(compiler);
+            pw.write(String.format("\nreturn %s;", ret));
+            final String expression = sw.toString();
+
+            return inputStream.filter(new EvaluationFilter(expression)).name(stageName);
+        }
+    }
+
+    @Override
+    public IAggregatableStream visitAggregate(Aggregate aggregate, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Aggregate is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(aggregate);
+
+        List<String> groupByFieldNames = new ArrayList<>();
+        for (Integer idx : aggregate.getGroupSet()) {
+            String fieldName = inputStream.getOutputFields().get(idx);
+            groupByFieldNames.add(fieldName);
+        }
+
+        Fields groupByFields = new Fields(groupByFieldNames);
+        GroupedStream groupedStream = inputStream.groupBy(groupByFields);
+
+        ChainedAggregatorDeclarer chainedAggregatorDeclarer = groupedStream.chainedAgg();
+        List<TransformInformation> transformsAfterChained = new ArrayList<>();
+        for (AggregateCall call : aggregate.getAggCallList()) {
+            appendAggregationInChain(chainedAggregatorDeclarer, groupByFieldNames, inputStream, call, transformsAfterChained);
+        }
+
+        Stream stream = chainedAggregatorDeclarer.chainEnd();
+        for (TransformInformation transformInformation : transformsAfterChained) {
+            stream = stream.each(transformInformation.getInputFields(), transformInformation.getFunction(), transformInformation.getFunctionFields());
+        }
+
+        // We're OK to project by Calcite information since each aggregation function create output fields via that information
+        List<String> outputFields = aggregate.getRowType().getFieldNames();
+        return stream.project(new Fields(outputFields)).name(stageName);
+    }
+
+    private void appendAggregationInChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, List<TransformInformation> transformsAfterChained) {
+        String outputField = call.getName();
+        String aggregationName = call.getAggregation().getName();
+
+        // only count can have no argument
+        if (call.getArgList().size() != 1) {
+            if (aggregationName.toUpperCase().equals("COUNT")) {
+                if (call.getArgList().size() > 0) {
+                    throw new IllegalArgumentException("COUNT should have one or no(a.k.a '*') argument");
+                }
+            } else {
+                throw new IllegalArgumentException("Aggregate call should have one argument");
+            }
+        }
+
+        switch (aggregationName.toUpperCase()) {
+            case "COUNT":
+                appendCountFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "MAX":
+                appendMaxFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "MIN":
+                appendMinFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "SUM":
+                appendSumFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "AVG":
+                appendAvgFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField, transformsAfterChained);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Not supported function: " + aggregationName.toUpperCase());
+        }
+    }
+
+    private void appendAvgFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, String outputField,
+                                          List<TransformInformation> transformsAfterChained) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+        if (!Number.class.isAssignableFrom(capturedTargetClazz)) {
+            throw new IllegalStateException("Return type of aggregation call should be a Number");
+        }
+
+        Class<? extends Number> targetClazz = (Class<? extends Number>) capturedTargetClazz;
+
+        String tempCountFieldName = "__cnt__" + outputField;
+        String tempSumFieldName = "__sum__" + outputField;
+
+        chainedDeclarer
+                .aggregate(new Fields(inputFields), new CountBy(inputFieldName), new Fields(tempCountFieldName))
+                .aggregate(new Fields(inputFields), new SumBy(inputFieldName, targetClazz), new Fields(tempSumFieldName));
+
+        TransformInformation divForAverage = new TransformInformation(
+                new Fields(tempSumFieldName, tempCountFieldName),
+                new DivideForAverage(targetClazz),
+                new Fields(outputField));
+        transformsAfterChained.add(divForAverage);
+    }
+
+    private void appendCountFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                            Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        if (call.getArgList().size() == 0) {
+            chainedDeclarer.aggregate(new Fields(inputFields), new Count(), new Fields(outputField));
+        } else {
+            // call.getArgList().size() == 1
+            Integer fieldIdx = call.getArgList().get(0);
+            String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+            inputFields.add(inputFieldName);
+            chainedDeclarer.aggregate(new Fields(inputFields), new CountBy(inputFieldName), new Fields(outputField));
+        }
+    }
+
+    private void appendMaxFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+
+        chainedDeclarer.aggregate(new Fields(inputFields), new MaxBy(inputFieldName, capturedTargetClazz), new Fields(outputField));
+    }
+
+    private void appendMinFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames, Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+
+        chainedDeclarer.aggregate(new Fields(inputFields), new MinBy(inputFieldName, capturedTargetClazz), new Fields(outputField));
+    }
+
+    private void appendSumFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames, Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+        if (!Number.class.isAssignableFrom(capturedTargetClazz)) {
+            throw new IllegalStateException("Return type of aggregation call should be a Number");
+        }
+
+        Class<? extends Number> targetClazz = (Class<? extends Number>) capturedTargetClazz;
+
+        List<String> outputFields = new ArrayList<>(groupByFieldNames);
+        outputFields.add(outputField);
+        chainedDeclarer.aggregate(new Fields(inputFields), new SumBy(inputFieldName, targetClazz), new Fields(outputField));
+    }
+
+    private String getStageName(RelNode n) {
+        return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+    }
+
+    private Class<?> getAggrCallReturnJavaType(AggregateCall call) {
+        return toClassWithBoxing(typeFactory.getJavaClass(call.getType()));
+    }
+
+    private Class toClassWithBoxing(Type type) {
+        Class clazz = (Class<?>)type;
+        if (clazz.isPrimitive()) {
+            clazz = Primitives.wrap(clazz);
+        }
+
+        return clazz;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 8a14eee..4b95560 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.sql.compiler;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -65,6 +64,37 @@ public class TestCompilerUtils {
         Table table = streamableTable.stream();
         schema.add("FOO", table);
         schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                false,
+                Collections.<String>emptyList(), typeFactory));
+        SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+                schema).operatorTable(chainedSqlOperatorTable).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverDummyGroupByTable(String sql)
+            throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+                (RelDataTypeSystem.DEFAULT);
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+                .field("ID", SqlTypeName.INTEGER)
+                .field("GRPID", SqlTypeName.INTEGER)
+                .field("NAME", typeFactory.createType(String.class))
+                .field("ADDR", typeFactory.createType(String.class))
+                .field("AGE", SqlTypeName.INTEGER)
+                .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
         FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
                 schema).build();
         Planner planner = Frameworks.getPlanner(config);

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index 017aa25..ae95fcc 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -64,7 +64,7 @@ public class TestExprCompiler {
       project.getChildExps().get(0).accept(compiler);
     }
 
-    assertThat(sw.toString(), containsString("(int)(_data.get(0));"));
+    assertThat(sw.toString(), containsString("(java.lang.Integer)(_data.get(0));"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
index 76eba1d..f40e138 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.containsString;
 
@@ -45,7 +46,8 @@ public class TestRelNodeCompiler {
          PrintWriter pw = new PrintWriter(sw)
     ) {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      compiler.visitFilter(filter);
+      // standalone mode doesn't use inputstreams argument
+      compiler.visitFilter(filter, Collections.EMPTY_LIST);
       pw.flush();
       Assert.assertThat(sw.toString(), containsString("> 3"));
     }
@@ -54,7 +56,8 @@ public class TestRelNodeCompiler {
          PrintWriter pw = new PrintWriter(sw)
     ) {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      compiler.visitProject(project);
+      // standalone mode doesn't use inputstreams argument
+      compiler.visitProject(project, Collections.EMPTY_LIST);
       pw.flush();
       Assert.assertThat(sw.toString(), containsString("plus("));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 0f8daa9..b1481ac 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -23,6 +23,7 @@ import org.apache.storm.Config;
 import org.apache.storm.ILocalCluster;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
@@ -39,7 +40,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.apache.storm.trident.TridentTopology;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -59,10 +62,10 @@ public class TestPlanCompiler {
     final int EXPECTED_VALUE_SIZE = 2;
     String sql = "SELECT ID FROM FOO WHERE ID > 2";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    final AbstractTridentProcessor proc = compiler.compile(state.tree());
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
@@ -71,20 +74,71 @@ public class TestPlanCompiler {
   }
 
   @Test
+  public void testCompileGroupByExp() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource());
+    String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(AGE) AS MAX_AGE, MIN(AGE) AS MIN_AGE, AVG(AGE) AS AVG_AGE, MAX(AGE) - MIN(AGE) AS DIFF FROM FOO GROUP BY GRPID";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyGroupByTable(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 5, 1, 3, 4)}, getCollectedValues().toArray());
+  }
+
+  @Test
   public void testInsert() throws Exception {
     final int EXPECTED_VALUE_SIZE = 1;
     String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    final AbstractTridentProcessor proc = compiler.compile(state.tree());
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
     data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
     Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray());
   }
 
+  @Test
+  public void testLogicalExpr() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(true, false, true) }, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testUdf() throws Exception {
+    int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT MYPLUS(ID, 3)" +
+            "FROM FOO " +
+            "WHERE ID = 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray());
+  }
+
   private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
                                   TridentTopology topo) throws Exception {
     final Config conf = new Config();

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
index c433473..6663039 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -73,6 +73,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- janino -->
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
new file mode 100644
index 0000000..0b64fea
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident;
+
+public class NumberConverter {
+    public static Number convert(Number num, Class<? extends Number> toType) {
+        // no need to convert
+        if (num.getClass().equals(toType)) {
+            return num;
+        }
+
+        if (toType.equals(Byte.class)) {
+            return num.byteValue();
+        } else if (toType.equals(Short.class)) {
+            return num.shortValue();
+        } else if (toType.equals(Integer.class)) {
+            return num.intValue();
+        } else if (toType.equals(Long.class)) {
+            return num.longValue();
+        } else if (toType.equals(Float.class)) {
+            return num.floatValue();
+        } else if (toType.equals(Double.class)) {
+            return num.doubleValue();
+        } else if (toType.isAssignableFrom(num.getClass())) {
+            // isAssignable is true so safe to return
+            return num;
+        } else {
+            throw new IllegalArgumentException("Can't convert Number " + num + " (class " + num.getClass() + " to " + toType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
new file mode 100644
index 0000000..28c2ae1
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class TridentUtils {
+    private TridentUtils() {
+    }
+
+    public static <T> T valueFromTuple(TridentTuple tuple, String inputFieldName) {
+        // when there is no input field then the whole tuple is considered for comparison.
+        Object value;
+        if (tuple == null) {
+            throw new IllegalArgumentException("Tuple is null");
+        }
+        return (T) tuple.getValueByField(inputFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
new file mode 100644
index 0000000..7afa096
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+
+public class EvaluationFilter extends BaseFilter {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+    private transient ScriptEvaluator evaluator;
+    private final String expression;
+
+    public EvaluationFilter(String expression) {
+        this.expression = expression;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        LOG.info("Expression: {}", expression);
+        try {
+            evaluator = new ScriptEvaluator(expression, Boolean.class,
+                    new String[] {"_data"}, new Class[] { List.class });
+        } catch (CompileException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+        try {
+            return (Boolean) evaluator.evaluate(new Object[] {tuple.getValues()});
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
new file mode 100644
index 0000000..b0bbce3
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+
+public class EvaluationFunction extends BaseFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+    private transient ScriptEvaluator evaluator;
+    private final String expression;
+
+    public EvaluationFunction(String expression) {
+        this.expression = expression;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        LOG.info("Expression: {}", expression);
+        try {
+            evaluator = new ScriptEvaluator(expression, Values.class,
+                    new String[] {"_data"}, new Class[] { List.class });
+        } catch (CompileException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        try {
+            collector.emit((Values) evaluator.evaluate(new Object[] {tuple.getValues()}));
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
new file mode 100644
index 0000000..4c3a266
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ForwardFunction extends BaseFunction {
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        collector.emit(tuple.getValues());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
new file mode 100644
index 0000000..df5d8c8
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class CountBy implements CombinerAggregator<Long> {
+
+    private final String inputFieldName;
+
+    public CountBy(String inputFieldName) {
+        this.inputFieldName = inputFieldName;
+    }
+
+    @Override
+    public Long init(TridentTuple tuple) {
+        // handles null field
+        if (TridentUtils.valueFromTuple(tuple, inputFieldName) == null) {
+            return 0L;
+        }
+
+        return 1L;
+    }
+
+    @Override
+    public Long combine(Long val1, Long val2) {
+        return val1 + val2;
+    }
+
+    @Override
+    public Long zero() {
+        return 0L;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
new file mode 100644
index 0000000..6dcf324
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+public class DivideForAverage extends BaseFunction {
+
+    private final Class<? extends Number> targetClazz;
+
+    public DivideForAverage(Class<? extends Number> targetClazz) {
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        // This expects two input fields: first one is the result of SumBy and next one is the result of CountBy
+        Number n1 = (Number)tuple.get(0);
+        // the type of result for CountBy is Long
+        Long n2 = (Long)tuple.get(1);
+
+        collector.emit(new Values(NumberConverter.convert(Numbers.divide(n1, n2), targetClazz)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0226dd4f/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
new file mode 100644
index 0000000..b3ab477
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MaxBy implements CombinerAggregator<Object> {
+
+    private final String inputFieldName;
+    private final Class<?> targetClazz;
+
+    public MaxBy(String inputFieldName, Class<?> targetClazz) {
+        this.inputFieldName = inputFieldName;
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public Object init(TridentTuple tuple) {
+        return TridentUtils.valueFromTuple(tuple, inputFieldName);
+    }
+
+    @Override
+    public Object combine(Object val1, Object val2) {
+        if (val1 == null) {
+            return val2;
+        }
+
+        if (Number.class.isAssignableFrom(targetClazz)) {
+            return NumberConverter.convert((Number) Numbers.max(val1, val2), (Class<? extends Number>) targetClazz);
+        }
+
+        if (!val1.getClass().equals(val2.getClass())) {
+            throw new IllegalArgumentException("The type of values are different! - class of val1: " +
+                    val1.getClass().getCanonicalName() + " and class of val2: " +
+                    val2.getClass().getCanonicalName());
+        } else if (!(val1 instanceof Comparable)) {
+            throw new IllegalArgumentException("Value is not Comparable - class of value: " +
+                    val1.getClass().getCanonicalName());
+        }
+
+        return ((Comparable)val1).compareTo(val2) < 0 ? val2 : val1;
+    }
+
+    @Override
+    public Object zero() {
+        return null;
+    }
+}