You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/24 22:02:53 UTC

[1/4] storm git commit: STORM-1434 Support the GROUP BY clause in StormSQL

Repository: storm
Updated Branches:
  refs/heads/master 0b080fcf6 -> 989b00811


http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MinBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MinBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MinBy.java
new file mode 100644
index 0000000..ced4201
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MinBy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MinBy implements CombinerAggregator<Object> {
+
+    private final String inputFieldName;
+    private final Class<?> targetClazz;
+
+    public MinBy(String inputFieldName, Class<?> targetClazz) {
+        this.inputFieldName = inputFieldName;
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public Object init(TridentTuple tuple) {
+        return TridentUtils.valueFromTuple(tuple, inputFieldName);
+    }
+
+    @Override
+    public Object combine(Object val1, Object val2) {
+        if (val1 == null) {
+            return val2;
+        }
+
+        if (Number.class.isAssignableFrom(targetClazz)) {
+            return NumberConverter.convert((Number) Numbers.min(val1, val2), (Class<? extends Number>) targetClazz);
+        }
+
+        if (!val1.getClass().equals(val2.getClass())) {
+            throw new IllegalArgumentException("The type of values are different! - class of val1: " +
+                    val1.getClass().getCanonicalName() + " and class of val2: " +
+                    val2.getClass().getCanonicalName());
+        } else if (!(val1 instanceof Comparable)) {
+            throw new IllegalArgumentException("Value is not Comparable - class of value: " +
+                    val1.getClass().getCanonicalName());
+        }
+
+        return ((Comparable)val1).compareTo(val2) > 0 ? val2 : val1;
+    }
+
+    @Override
+    public Object zero() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/SumBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/SumBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/SumBy.java
new file mode 100644
index 0000000..77300c3
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/SumBy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class SumBy implements CombinerAggregator<Number> {
+
+    private final String inputFieldName;
+    private final Class<? extends Number> targetClazz;
+
+    public SumBy(String inputFieldName, Class<? extends Number> targetClazz) {
+        this.inputFieldName = inputFieldName;
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public Number init(TridentTuple tuple) {
+        // handles null field
+        Object value = TridentUtils.valueFromTuple(tuple, inputFieldName);
+        if (value == null) {
+            return zero();
+        }
+
+        return (Number) value;
+    }
+
+    @Override
+    public Number combine(Number val1, Number val2) {
+        // to preserve the type of val
+        // FIXME: find the alternatives if we don't want to take the overhead
+        return NumberConverter.convert(Numbers.add(val1, val2), targetClazz);
+    }
+
+    @Override
+    public Number zero() {
+        return NumberConverter.convert(0, targetClazz);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 5e9d78c..5789bc5 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -234,6 +234,80 @@ public class TestUtils {
     }
   }
 
+  public static class MockSqlTridentGroupedDataSource implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockGroupedSpout();
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new CollectDataFunction();
+    }
+
+    public static class CollectDataFunction extends BaseFunction {
+      /**
+       * Collect all values in a static variable as the instance will go through serialization and deserialization.
+       */
+      private transient static final List<List<Object> > VALUES = new ArrayList<>();
+      public static List<List<Object>> getCollectedValues() {
+        return VALUES;
+      }
+
+      @Override
+      public void execute(TridentTuple tuple, TridentCollector collector) {
+        VALUES.add(tuple.getValues());
+      }
+    }
+
+    private static class MockGroupedSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE");
+
+      public MockGroupedSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i, 0, "x", "y", 5 - i));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
   public static class CollectDataChannelHandler implements ChannelHandler {
     private final List<Values> values;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 14f45f3..15b2488 100644
--- a/pom.xml
+++ b/pom.xml
@@ -265,6 +265,7 @@
         <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
         <calcite.version>1.4.0-incubating</calcite.version>
+        <janino.version>2.7.8</janino.version>
         <jackson.version>2.6.3</jackson.version>
         <maven-surefire.version>2.18.1</maven-surefire.version>
         <!-- Kafka version used by old storm-kafka spout code -->
@@ -882,6 +883,12 @@
                 <artifactId>calcite-core</artifactId>
                 <version>${calcite.version}</version>
             </dependency>
+            <!-- used by storm-sql-core and storm-sql-runtime -->
+            <dependency>
+                <groupId>org.codehaus.janino</groupId>
+                <artifactId>janino</artifactId>
+                <version>${janino.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-databind</artifactId>


[4/4] storm git commit: add STORM-1434 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1434 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/989b0081
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/989b0081
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/989b0081

Branch: refs/heads/master
Commit: 989b00811f631a7bfedb0f8ee6888efd4637e562
Parents: 64fb1b1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 25 07:01:21 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 25 07:01:21 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/989b0081/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6cbcd2..11968f0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -144,6 +144,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1434: Support the GROUP BY clause in StormSQL
  * STORM-2016: Topology submission improvement: support adding local jars and maven artifacts on submission
  * STORM-1994: Add table with per-topology & worker resource usage and components in (new) supervisor and topology pages
  * STORM-2023: Add calcite-core to dependency of storm-sql-runtime


[2/4] storm git commit: STORM-1434 Support the GROUP BY clause in StormSQL

Posted by ka...@apache.org.
STORM-1434 Support the GROUP BY clause in StormSQL

* Support GROUP BY for Trident
* Implement basic functions for aggregation
* Change the way of converting Calcite logical plan to Trident logical plan
** before: creating codes and compile them
** after: use Trident features, only creating code block if evaluation is needed
*** Janino comes in to help evaluating code block in runtime
* expand PostOrderRelNodeVisitor to be generalized version of TridentPostOrderRelNodeVisitor
* Change the way to join results of aggregation functions via chaining
** Trident handles chained aggregator via applying each tuple to all aggregators, get rid of multiple reading for input tuples
* Remove the empty check for tuple
** Crash fast if input fields for aggregation / function is not set rather than giving wrong result
* add comment why we package empty topology and how sql topology works
* Add test for scalar UDF with Trident


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28eefbef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28eefbef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28eefbef

Branch: refs/heads/master
Commit: 28eefbef90900f715e19a55da9d48fcec1f32947
Parents: da5c3ac
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 18 16:32:46 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 25 06:58:38 2016 +0900

----------------------------------------------------------------------
 external/sql/README.md                          |   6 +-
 external/sql/storm-sql-core/pom.xml             |   5 +
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  19 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  16 +-
 .../sql/compiler/PostOrderRelNodeVisitor.java   | 100 ++---
 .../backends/standalone/RelNodeCompiler.java    |  14 +-
 .../compiler/backends/trident/PlanCompiler.java | 222 +++---------
 .../backends/trident/RelNodeCompiler.java       | 116 ------
 .../trident/TridentLogicalPlanCompiler.java     | 363 +++++++++++++++++++
 .../storm/sql/compiler/TestCompilerUtils.java   |  32 +-
 .../storm/sql/compiler/TestExprCompiler.java    |   2 +-
 .../standalone/TestRelNodeCompiler.java         |   7 +-
 .../backends/trident/TestPlanCompiler.java      |  62 +++-
 external/sql/storm-sql-runtime/pom.xml          |   5 +
 .../sql/runtime/trident/NumberConverter.java    |  47 +++
 .../storm/sql/runtime/trident/TridentUtils.java |  35 ++
 .../trident/functions/EvaluationFilter.java     |  62 ++++
 .../trident/functions/EvaluationFunction.java   |  64 ++++
 .../trident/functions/ForwardFunction.java      |  30 ++
 .../sql/runtime/trident/operations/CountBy.java |  53 +++
 .../trident/operations/DivideForAverage.java    |  45 +++
 .../sql/runtime/trident/operations/MaxBy.java   |  68 ++++
 .../sql/runtime/trident/operations/MinBy.java   |  68 ++++
 .../sql/runtime/trident/operations/SumBy.java   |  60 +++
 .../test/org/apache/storm/sql/TestUtils.java    |  74 ++++
 pom.xml                                         |   7 +
 26 files changed, 1211 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index 6f68951..66b2185 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -19,6 +19,7 @@ The following features are supported in the current repository:
 * Streaming from and to external data sources
 * Filtering tuples
 * Projections
+* Aggregations (Grouping)
 
 ## Specifying External Data Sources
 
@@ -87,8 +88,7 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
 
 ## Current Limitations
 
-Aggregation, windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not
-yet supported.
+Windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. Supported aggregation functions are 'SUM', 'AVG', 'COUNT', 'MIN', 'MAX'. (Planned to address UDF - user defined functions.)
 
 Users also need to provide the dependency of the external data sources in the `extlib` directory. Otherwise the topology
 will fail to run because of `ClassNotFoundException`.
@@ -114,4 +114,4 @@ under the License.
 
 ## Committer Sponsors
  * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index 8a9e313..d9a6098 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -86,6 +86,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- janino -->
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index dbe0ddd..e404999 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -119,14 +119,17 @@ class StormSqlImpl extends StormSql {
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
         org.apache.storm.sql.compiler.backends.trident.PlanCompiler compiler =
-            new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(typeFactory);
-        AbstractTridentProcessor proc = compiler.compile(tree);
-        TridentTopology topo = proc.build(dataSources);
+                new org.apache.storm.sql.compiler.backends.trident.PlanCompiler(dataSources, typeFactory);
+        TridentTopology topo = compiler.compile(tree);
         Path jarPath = null;
         try {
+          // PlanCompiler configures the topology without any new classes, so we don't need to add anything into topology jar
+          // packaging empty jar since topology jar is needed for topology submission
+          // Topology will be serialized and sent to Nimbus, and deserialized and executed in workers.
+
           jarPath = Files.createTempFile("storm-sql", ".jar");
           System.setProperty("storm.jar", jarPath.toString());
-          packageTopology(jarPath, compiler.getCompilingClassLoader(), proc);
+          packageEmptyTopology(jarPath);
           StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);
         } finally {
           if (jarPath != null) {
@@ -137,18 +140,12 @@ class StormSqlImpl extends StormSql {
     }
   }
 
-  private void packageTopology(Path jar, CompilingClassLoader cl, AbstractTridentProcessor processor) throws IOException {
+  private void packageEmptyTopology(Path jar) throws IOException {
     Manifest manifest = new Manifest();
     Attributes attr = manifest.getMainAttributes();
     attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
-    attr.put(Attributes.Name.MAIN_CLASS, processor.getClass().getCanonicalName());
     try (JarOutputStream out = new JarOutputStream(
         new BufferedOutputStream(new FileOutputStream(jar.toFile())), manifest)) {
-      for (Map.Entry<String, ByteArrayOutputStream> e : cl.getClasses().entrySet()) {
-        out.putNextEntry(new ZipEntry(e.getKey().replace(".", "/") + ".class"));
-        out.write(e.getValue().toByteArray());
-        out.closeEntry();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 0977acc..4e1c127 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Primitives;
 import org.apache.calcite.adapter.enumerable.NullPolicy;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.tree.Primitive;
@@ -67,8 +68,9 @@ public class ExprCompiler implements RexVisitor<String> {
   public String visitInputRef(RexInputRef rexInputRef) {
     String name = reserveName();
     String typeName = javaTypeName(rexInputRef);
+    String boxedTypeName = boxedJavaTypeName(rexInputRef);
     pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
-                           typeName, rexInputRef.getIndex()));
+                           boxedTypeName, rexInputRef.getIndex()));
     return name;
   }
 
@@ -149,6 +151,16 @@ public class ExprCompiler implements RexVisitor<String> {
     return ((Class<?>)ty).getCanonicalName();
   }
 
+  private String boxedJavaTypeName(RexNode node) {
+    Type ty = typeFactory.getJavaClass(node.getType());
+    Class clazz = (Class<?>)ty;
+    if (clazz.isPrimitive()) {
+      clazz = Primitives.wrap(clazz);
+    }
+
+    return clazz.getCanonicalName();
+  }
+
   private String reserveName() {
     return "t" + ++nameCount;
   }
@@ -479,7 +491,7 @@ public class ExprCompiler implements RexVisitor<String> {
         RexNode op = call.getOperands().get(0);
         String lhs = op.accept(compiler);
         pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n",
-                               compiler.javaTypeName(call), val, lhs));
+                               compiler.boxedJavaTypeName(call), val, lhs));
         return val;
       }
     };

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
index 9dcfd29..2077ea9 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
@@ -21,108 +21,112 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.*;
 import org.apache.calcite.rel.stream.Delta;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public abstract class PostOrderRelNodeVisitor<T> {
   public final T traverse(RelNode n) throws Exception {
+    List<T> inputStreams = new ArrayList<>();
     for (RelNode input : n.getInputs()) {
-      traverse(input);
+      inputStreams.add(traverse(input));
     }
 
     if (n instanceof Aggregate) {
-      return visitAggregate((Aggregate) n);
+      return visitAggregate((Aggregate) n, inputStreams);
     } else if (n instanceof Calc) {
-      return visitCalc((Calc) n);
+      return visitCalc((Calc) n, inputStreams);
     } else if (n instanceof Collect) {
-      return visitCollect((Collect) n);
+      return visitCollect((Collect) n, inputStreams);
     } else if (n instanceof Correlate) {
-      return visitCorrelate((Correlate) n);
+      return visitCorrelate((Correlate) n, inputStreams);
     } else if (n instanceof Delta) {
-      return visitDelta((Delta) n);
+      return visitDelta((Delta) n, inputStreams);
     } else if (n instanceof Exchange) {
-      return visitExchange((Exchange) n);
+      return visitExchange((Exchange) n, inputStreams);
     } else if (n instanceof Project) {
-      return visitProject((Project) n);
+      return visitProject((Project) n, inputStreams);
     } else if (n instanceof Filter) {
-      return visitFilter((Filter) n);
+      return visitFilter((Filter) n, inputStreams);
     } else if (n instanceof Sample) {
-      return visitSample((Sample) n);
+      return visitSample((Sample) n, inputStreams);
     } else if (n instanceof Sort) {
-      return visitSort((Sort) n);
+      return visitSort((Sort) n, inputStreams);
     } else if (n instanceof TableModify) {
-      return visitTableModify((TableModify) n);
+      return visitTableModify((TableModify) n, inputStreams);
     } else if (n instanceof TableScan) {
-      return visitTableScan((TableScan) n);
+      return visitTableScan((TableScan) n, inputStreams);
     } else if (n instanceof Uncollect) {
-      return visitUncollect((Uncollect) n);
+      return visitUncollect((Uncollect) n, inputStreams);
     } else if (n instanceof Window) {
-      return visitWindow((Window) n);
+      return visitWindow((Window) n, inputStreams);
     } else if (n instanceof Join) {
-      return visitJoin((Join) n);
+      return visitJoin((Join) n, inputStreams);
     } else {
-      return defaultValue(n);
+      return defaultValue(n, inputStreams);
     }
   }
 
-  public T visitAggregate(Aggregate aggregate) throws Exception {
-    return defaultValue(aggregate);
+  public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
+    return defaultValue(aggregate, inputStreams);
   }
 
-  public T visitCalc(Calc calc) throws Exception {
-    return defaultValue(calc);
+  public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
+    return defaultValue(calc, inputStreams);
   }
 
-  public T visitCollect(Collect collect) throws Exception {
-    return defaultValue(collect);
+  public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
+    return defaultValue(collect, inputStreams);
   }
 
-  public T visitCorrelate(Correlate correlate) throws Exception {
-    return defaultValue(correlate);
+  public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
+    return defaultValue(correlate, inputStreams);
   }
 
-  public T visitDelta(Delta delta) throws Exception {
-    return defaultValue(delta);
+  public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
+    return defaultValue(delta, inputStreams);
   }
 
-  public T visitExchange(Exchange exchange) throws Exception {
-    return defaultValue(exchange);
+  public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
+    return defaultValue(exchange, inputStreams);
   }
 
-  public T visitProject(Project project) throws Exception {
-    return defaultValue(project);
+  public T visitProject(Project project, List<T> inputStreams) throws Exception {
+    return defaultValue(project, inputStreams);
   }
 
-  public T visitFilter(Filter filter) throws Exception {
-    return defaultValue(filter);
+  public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
+    return defaultValue(filter, inputStreams);
   }
 
-  public T visitSample(Sample sample) throws Exception {
-    return defaultValue(sample);
+  public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
+    return defaultValue(sample, inputStreams);
   }
 
-  public T visitSort(Sort sort) throws Exception {
-    return defaultValue(sort);
+  public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
+    return defaultValue(sort, inputStreams);
   }
 
-  public T visitTableModify(TableModify modify) throws Exception {
-    return defaultValue(modify);
+  public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
+    return defaultValue(modify, inputStreams);
   }
 
-  public T visitTableScan(TableScan scan) throws Exception {
-    return defaultValue(scan);
+  public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
+    return defaultValue(scan, inputStreams);
   }
 
-  public T visitUncollect(Uncollect uncollect) throws Exception {
-    return defaultValue(uncollect);
+  public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
+    return defaultValue(uncollect, inputStreams);
   }
 
-  public T visitWindow(Window window) throws Exception {
-    return defaultValue(window);
+  public T visitWindow(Window window, List<T> inputStreams) throws Exception {
+    return defaultValue(window, inputStreams);
   }
 
-  public T visitJoin(Join join) throws Exception {
-    return defaultValue(join);
+  public T visitJoin(Join join, List<T> inputStreams) throws Exception {
+    return defaultValue(join, inputStreams);
   }
 
-  public T defaultValue(RelNode n) {
+  public T defaultValue(RelNode n, List<T> inputStreams) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 3ede7e9..9df0496 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -196,13 +196,13 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitDelta(Delta delta) throws Exception {
+  public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
     pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
     return null;
   }
 
   @Override
-  public Void visitFilter(Filter filter) throws Exception {
+  public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
     beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     String r = filter.getCondition().accept(compiler);
@@ -216,7 +216,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitProject(Project project) throws Exception {
+  public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
     beginStage(project);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     int size = project.getChildExps().size();
@@ -232,18 +232,18 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void defaultValue(RelNode n) {
+  public Void defaultValue(RelNode n, List<Void> inputStreams) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public Void visitTableScan(TableScan scan) throws Exception {
+  public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
     pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
     return null;
   }
 
   @Override
-  public Void visitAggregate(Aggregate aggregate) throws Exception {
+  public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
     beginAggregateStage(aggregate);
     pw.println("        List<Object> curGroupValues = _data == null ? null : getGroupValues(_data);");
     pw.println("        if (prevGroupValues != null && !prevGroupValues.equals(curGroupValues)) {");
@@ -262,7 +262,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  public Void visitJoin(Join join) {
+  public Void visitJoin(Join join, List<Void> inputStreams) {
     beginJoinStage(join);
     pw.println("        if (source == left) {");
     pw.println("            leftRows.add(_data);");

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
index 7a5516d..41777e5 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -1,201 +1,71 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  * <p>
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  * <p>
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.storm.sql.compiler.backends.trident;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
-import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Map;
 
 public class PlanCompiler {
-  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
-  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
-  private static final String PROLOGUE = NEW_LINE_JOINER.join(
-      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
-      "import java.util.List;",
-      "import java.util.Map;",
-      "import org.apache.storm.tuple.Fields;",
-      "import org.apache.storm.tuple.Values;",
-      "import org.apache.storm.sql.runtime.ISqlTridentDataSource;",
-      "import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;",
-      "import org.apache.storm.trident.Stream;",
-      "import org.apache.storm.trident.TridentTopology;",
-      "import org.apache.storm.trident.fluent.IAggregatableStream;",
-      "import org.apache.storm.trident.operation.TridentCollector;",
-      "import org.apache.storm.trident.operation.BaseFunction;",
-      "import org.apache.storm.trident.spout.IBatchSpout;",
-      "import org.apache.storm.trident.tuple.TridentTuple;",
-      "",
-      "public final class TridentProcessor extends AbstractTridentProcessor {",
-      "");
-  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
-      "  @Override",
-      "  public TridentTopology build(Map<String, ISqlTridentDataSource> _sources) {",
-      "    TridentTopology topo = new TridentTopology();",
-      ""
-  );
+    private Map<String, ISqlTridentDataSource> sources;
+    private final JavaTypeFactory typeFactory;
 
-  private final JavaTypeFactory typeFactory;
-  private CompilingClassLoader compilingClassLoader;
-  public PlanCompiler(JavaTypeFactory typeFactory) {
-    this.typeFactory = typeFactory;
-  }
-
-  private String generateJavaSource(RelNode root) throws Exception {
-    StringWriter sw = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(sw)) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      printPrologue(pw);
-      compiler.traverse(root);
-      printMain(pw, root);
-      printEpilogue(pw);
+    public PlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory) {
+        this.sources = sources;
+        this.typeFactory = typeFactory;
     }
-    return sw.toString();
-  }
-
-  private static class MainFuncCompiler extends PostOrderRelNodeVisitor<Void> {
-    private final PrintWriter pw;
-    private static final String TABLESCAN_TMPL = NEW_LINE_JOINER.join(
-        "if (!_sources.containsKey(%2$s))",
-        "    throw new RuntimeException(\"Cannot find table \" + %2$s);",
-        "Stream _%1$s = topo.newStream(%2$s, _sources.get(%2$s).getProducer());",
-        ""
-    );
 
-    private static final String TABLEMODIFY_TMPL = NEW_LINE_JOINER.join(
-        "Stream _%1$s = _%3$s.each(new Fields(%4$s), _sources.get(%2$s).getConsumer(), new Fields(%5$s));",
-        ""
-    );
-    private static final String TRANSFORMATION_TMPL = NEW_LINE_JOINER.join(
-        "Stream _%1$s = _%2$s.each(new Fields(%3$s), %1$s, new Fields(%4$s)).toStream().project(new Fields(%4$s));",
-        ""
-    );
+    public AbstractTridentProcessor compileForTest(RelNode plan) throws Exception {
+        final TridentTopology topology = new TridentTopology();
 
-    private MainFuncCompiler(PrintWriter pw) {
-      this.pw = pw;
-    }
+        TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        final IAggregatableStream stream = compiler.traverse(plan);
 
-    @Override
-    public Void defaultValue(RelNode n) {
-      throw new UnsupportedOperationException();
-    }
+        return new AbstractTridentProcessor() {
+            @Override
+            public Stream outputStream() {
+                return stream.toStream();
+            }
 
-    @Override
-    public Void visitFilter(Filter filter) throws Exception {
-      visitTransformation(filter);
-      return null;
+            @Override
+            public TridentTopology build(Map<String, ISqlTridentDataSource> sources) {
+                return topology;
+            }
+        };
     }
 
-    @Override
-    public Void visitTableModify(TableModify modify) throws Exception {
-      Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
-      String name = RelNodeCompiler.getStageName(modify);
-      RelNode input = modify.getInput();
-      String inputName = RelNodeCompiler.getStageName(input);
-      pw.print(String.format(TABLEMODIFY_TMPL, name, CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(modify.getTable().getQualifiedName()), true),
-          inputName, getFieldString(input), getFieldString(modify)));
-      return null;
-    }
+    public TridentTopology compile(RelNode plan) throws Exception {
+        TridentTopology topology = new TridentTopology();
 
-    @Override
-    public Void visitTableScan(TableScan scan) throws Exception {
-      String name = RelNodeCompiler.getStageName(scan);
-      pw.print(String.format(TABLESCAN_TMPL, name, CompilerUtil.escapeJavaString(
-          Joiner.on('.').join(scan.getTable().getQualifiedName()), true)));
-      return null;
-    }
+        TridentLogicalPlanCompiler compiler = new TridentLogicalPlanCompiler(sources, typeFactory, topology);
+        compiler.traverse(plan);
 
-    @Override
-    public Void visitProject(Project project) throws Exception {
-      visitTransformation(project);
-      return null;
+        return topology;
     }
 
-    private static String getFieldString(RelNode n) {
-      int id = n.getId();
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-      for (String f: n.getRowType().getFieldNames()) {
-        if (!first) {
-          sb.append(", ");
-        }
-        if (n instanceof TableScan) {
-          sb.append(CompilerUtil.escapeJavaString(f, true));
-        } else {
-          sb.append(CompilerUtil.escapeJavaString(String.format("%d_%s", id, f), true));
-        }
-        first = false;
-      }
-      return sb.toString();
-    }
-
-    private void visitTransformation(SingleRel node) {
-      String name = RelNodeCompiler.getStageName(node);
-      RelNode input = node.getInput();
-      String inputName = RelNodeCompiler.getStageName(input);
-      pw.print(String.format(TRANSFORMATION_TMPL, name, inputName,
-          getFieldString(input), getFieldString(node)));
-    }
-  }
-
-  private void printMain(PrintWriter pw, RelNode root) throws Exception {
-    pw.print(INITIALIZER_PROLOGUE);
-    MainFuncCompiler compiler = new MainFuncCompiler(pw);
-    compiler.traverse(root);
-    pw.print(String.format("  this.outputStream = _%s;\n", RelNodeCompiler.getStageName(root)));
-    pw.print("  return topo; \n}\n");
-  }
-
-  public AbstractTridentProcessor compile(RelNode plan) throws Exception {
-    String javaCode = generateJavaSource(plan);
-    compilingClassLoader = new CompilingClassLoader(getClass().getClassLoader(),
-        PACKAGE_NAME + ".TridentProcessor",
-        javaCode, null);
-    return (AbstractTridentProcessor) compilingClassLoader.loadClass(PACKAGE_NAME + ".TridentProcessor").newInstance();
-  }
 
-  public CompilingClassLoader getCompilingClassLoader() {
-    return compilingClassLoader;
-  }
 
-  private static void printEpilogue(
-      PrintWriter pw) throws Exception {
-    pw.print("}\n");
-  }
 
-  private static void printPrologue(PrintWriter pw) {
-    pw.append(PROLOGUE);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
deleted file mode 100644
index 340b9a2..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  * <p>
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  * <p>
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.storm.sql.compiler.backends.trident;
-
-import org.apache.storm.tuple.Fields;
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.storm.sql.compiler.ExprCompiler;
-import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
-
-import java.io.PrintWriter;
-import java.util.IdentityHashMap;
-import java.util.Map;
-
-/**
- * Compile RelNodes into individual functions.
- */
-class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
-  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
-
-  private final PrintWriter pw;
-  private final JavaTypeFactory typeFactory;
-  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-    "  private static final BaseFunction %1$s = ",
-    "    new BaseFunction() {",
-    "    @Override",
-    "    public void execute(TridentTuple tuple, TridentCollector collector) {",
-    "      List<Object> _data = tuple.getValues();",
-    ""
-  );
-
-  private final IdentityHashMap<RelNode, Fields> outputFields = new IdentityHashMap<>();
-
-  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
-    this.pw = pw;
-    this.typeFactory = typeFactory;
-  }
-
-  @Override
-  public Void visitFilter(Filter filter) throws Exception {
-    beginStage(filter);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-    String r = filter.getCondition().accept(compiler);
-    pw.print(String.format("    if (%s) { collector.emit(_data); }\n", r));
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitProject(Project project) throws Exception {
-    beginStage(project);
-    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
-
-    int size = project.getChildExps().size();
-    String[] res = new String[size];
-    for (int i = 0; i < size; ++i) {
-      res[i] = project.getChildExps().get(i).accept(compiler);
-    }
-
-    pw.print(String.format("    collector.emit(new Values(%s));\n",
-                           Joiner.on(',').join(res)));
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void defaultValue(RelNode n) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Void visitTableScan(TableScan scan) throws Exception {
-    return null;
-  }
-
-  @Override
-  public Void visitTableModify(TableModify modify) throws Exception {
-    return null;
-  }
-
-  private void beginStage(RelNode n) {
-    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
-  }
-
-  private void endStage() {
-    pw.print("  }\n  };\n");
-  }
-
-  static String getStageName(RelNode n) {
-    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
new file mode 100644
index 0000000..4886ffc
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
+import org.apache.storm.sql.runtime.trident.functions.ForwardFunction;
+import org.apache.storm.sql.runtime.trident.operations.CountBy;
+import org.apache.storm.sql.runtime.trident.operations.DivideForAverage;
+import org.apache.storm.sql.runtime.trident.operations.MaxBy;
+import org.apache.storm.sql.runtime.trident.operations.MinBy;
+import org.apache.storm.sql.runtime.trident.operations.SumBy;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
+import org.apache.storm.trident.fluent.GroupedStream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.tuple.Fields;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class TransformInformation {
+    private Fields inputFields;
+    private Function function;
+    private Fields functionFields;
+
+    public TransformInformation(Fields inputFields, Function function, Fields functionFields) {
+        this.inputFields = inputFields;
+        this.function = function;
+        this.functionFields = functionFields;
+    }
+
+    public Fields getInputFields() {
+        return inputFields;
+    }
+
+    public Function getFunction() {
+        return function;
+    }
+
+    public Fields getFunctionFields() {
+        return functionFields;
+    }
+}
+
+public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggregatableStream> {
+    protected final Map<String, ISqlTridentDataSource> sources;
+    protected final JavaTypeFactory typeFactory;
+    protected TridentTopology topology;
+
+    public TridentLogicalPlanCompiler(Map<String, ISqlTridentDataSource> sources, JavaTypeFactory typeFactory, TridentTopology topology) {
+        this.sources = sources;
+        this.typeFactory = typeFactory;
+        this.topology = topology;
+    }
+
+    @Override
+    public IAggregatableStream defaultValue(RelNode n, List<IAggregatableStream> inputStreams) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IAggregatableStream visitTableScan(TableScan scan, List<IAggregatableStream> inputStreams) throws Exception {
+        String sourceName = Joiner.on('.').join(scan.getTable().getQualifiedName());
+        if (!sources.containsKey(sourceName)) {
+            throw new RuntimeException("Cannot find table " + sourceName);
+        }
+
+        String stageName = getStageName(scan);
+        return topology.newStream(stageName, sources.get(sourceName).getProducer());
+    }
+
+    @Override
+    public IAggregatableStream visitTableModify(TableModify modify, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("TableModify is a SingleRel");
+        }
+
+        Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
+        RelNode input = modify.getInput();
+        String tableName = Joiner.on('.').join(modify.getTable().getQualifiedName());
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(modify);
+
+        List<String> inputFields = input.getRowType().getFieldNames();
+        List<String> outputFields = modify.getRowType().getFieldNames();
+
+        return inputStream.each(new Fields(inputFields), sources.get(tableName).getConsumer(), new Fields(outputFields))
+                .name(stageName);
+    }
+
+    @Override
+    public IAggregatableStream visitProject(Project project, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Project is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        Fields inputFields = inputStream.getOutputFields();
+        String stageName = getStageName(project);
+
+        // Trident doesn't allow duplicated field name... need to do the trick...
+        List<String> outputFieldNames = project.getRowType().getFieldNames();
+        List<String> temporaryOutputFieldNames = new ArrayList<>();
+        for (String outputFieldName : outputFieldNames) {
+            temporaryOutputFieldNames.add("__" + outputFieldName + "__");
+        }
+
+        try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
+            pw.write("import org.apache.storm.tuple.Values;\n");
+
+            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+            int size = project.getChildExps().size();
+            String[] res = new String[size];
+            for (int i = 0; i < size; ++i) {
+                res[i] = project.getChildExps().get(i).accept(compiler);
+            }
+
+            pw.write(String.format("\nreturn new Values(%s);", Joiner.on(',').join(res)));
+            final String expression = sw.toString();
+
+            return inputStream.each(inputFields, new EvaluationFunction(expression), new Fields(temporaryOutputFieldNames))
+                    .project(new Fields(temporaryOutputFieldNames))
+                    .each(new Fields(temporaryOutputFieldNames), new ForwardFunction(), new Fields(outputFieldNames))
+                    .project(new Fields(outputFieldNames))
+                    .name(stageName);
+        }
+    }
+
+    @Override
+    public IAggregatableStream visitFilter(Filter filter, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Filter is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(filter);
+
+        try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
+            ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+            String ret = filter.getCondition().accept(compiler);
+            pw.write(String.format("\nreturn %s;", ret));
+            final String expression = sw.toString();
+
+            return inputStream.filter(new EvaluationFilter(expression)).name(stageName);
+        }
+    }
+
+    @Override
+    public IAggregatableStream visitAggregate(Aggregate aggregate, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() > 1) {
+            throw new RuntimeException("Aggregate is a SingleRel");
+        }
+
+        Stream inputStream = inputStreams.get(0).toStream();
+        String stageName = getStageName(aggregate);
+
+        List<String> groupByFieldNames = new ArrayList<>();
+        for (Integer idx : aggregate.getGroupSet()) {
+            String fieldName = inputStream.getOutputFields().get(idx);
+            groupByFieldNames.add(fieldName);
+        }
+
+        Fields groupByFields = new Fields(groupByFieldNames);
+        GroupedStream groupedStream = inputStream.groupBy(groupByFields);
+
+        ChainedAggregatorDeclarer chainedAggregatorDeclarer = groupedStream.chainedAgg();
+        List<TransformInformation> transformsAfterChained = new ArrayList<>();
+        for (AggregateCall call : aggregate.getAggCallList()) {
+            appendAggregationInChain(chainedAggregatorDeclarer, groupByFieldNames, inputStream, call, transformsAfterChained);
+        }
+
+        Stream stream = chainedAggregatorDeclarer.chainEnd();
+        for (TransformInformation transformInformation : transformsAfterChained) {
+            stream = stream.each(transformInformation.getInputFields(), transformInformation.getFunction(), transformInformation.getFunctionFields());
+        }
+
+        // We're OK to project by Calcite information since each aggregation function create output fields via that information
+        List<String> outputFields = aggregate.getRowType().getFieldNames();
+        return stream.project(new Fields(outputFields)).name(stageName);
+    }
+
+    private void appendAggregationInChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, List<TransformInformation> transformsAfterChained) {
+        String outputField = call.getName();
+        String aggregationName = call.getAggregation().getName();
+
+        // only count can have no argument
+        if (call.getArgList().size() != 1) {
+            if (aggregationName.toUpperCase().equals("COUNT")) {
+                if (call.getArgList().size() > 0) {
+                    throw new IllegalArgumentException("COUNT should have one or no(a.k.a '*') argument");
+                }
+            } else {
+                throw new IllegalArgumentException("Aggregate call should have one argument");
+            }
+        }
+
+        switch (aggregationName.toUpperCase()) {
+            case "COUNT":
+                appendCountFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "MAX":
+                appendMaxFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "MIN":
+                appendMinFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "SUM":
+                appendSumFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField);
+                break;
+
+            case "AVG":
+                appendAvgFunctionToChain(chainedDeclarer, groupByFieldNames, inputStream, call, outputField, transformsAfterChained);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Not supported function: " + aggregationName.toUpperCase());
+        }
+    }
+
+    private void appendAvgFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, String outputField,
+                                          List<TransformInformation> transformsAfterChained) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+        if (!Number.class.isAssignableFrom(capturedTargetClazz)) {
+            throw new IllegalStateException("Return type of aggregation call should be a Number");
+        }
+
+        Class<? extends Number> targetClazz = (Class<? extends Number>) capturedTargetClazz;
+
+        String tempCountFieldName = "__cnt__" + outputField;
+        String tempSumFieldName = "__sum__" + outputField;
+
+        chainedDeclarer
+                .aggregate(new Fields(inputFields), new CountBy(inputFieldName), new Fields(tempCountFieldName))
+                .aggregate(new Fields(inputFields), new SumBy(inputFieldName, targetClazz), new Fields(tempSumFieldName));
+
+        TransformInformation divForAverage = new TransformInformation(
+                new Fields(tempSumFieldName, tempCountFieldName),
+                new DivideForAverage(targetClazz),
+                new Fields(outputField));
+        transformsAfterChained.add(divForAverage);
+    }
+
+    private void appendCountFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                            Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        if (call.getArgList().size() == 0) {
+            chainedDeclarer.aggregate(new Fields(inputFields), new Count(), new Fields(outputField));
+        } else {
+            // call.getArgList().size() == 1
+            Integer fieldIdx = call.getArgList().get(0);
+            String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+            inputFields.add(inputFieldName);
+            chainedDeclarer.aggregate(new Fields(inputFields), new CountBy(inputFieldName), new Fields(outputField));
+        }
+    }
+
+    private void appendMaxFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames,
+                                          Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+
+        chainedDeclarer.aggregate(new Fields(inputFields), new MaxBy(inputFieldName, capturedTargetClazz), new Fields(outputField));
+    }
+
+    private void appendMinFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames, Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+
+        chainedDeclarer.aggregate(new Fields(inputFields), new MinBy(inputFieldName, capturedTargetClazz), new Fields(outputField));
+    }
+
+    private void appendSumFunctionToChain(ChainedAggregatorDeclarer chainedDeclarer, List<String> groupByFieldNames, Stream inputStream, AggregateCall call, String outputField) {
+        List<String> inputFields = new ArrayList<>(groupByFieldNames);
+        Integer fieldIdx = call.getArgList().get(0);
+        String inputFieldName = inputStream.getOutputFields().get(fieldIdx);
+        inputFields.add(inputFieldName);
+
+        Class<?> capturedTargetClazz = getAggrCallReturnJavaType(call);
+        if (!Number.class.isAssignableFrom(capturedTargetClazz)) {
+            throw new IllegalStateException("Return type of aggregation call should be a Number");
+        }
+
+        Class<? extends Number> targetClazz = (Class<? extends Number>) capturedTargetClazz;
+
+        List<String> outputFields = new ArrayList<>(groupByFieldNames);
+        outputFields.add(outputField);
+        chainedDeclarer.aggregate(new Fields(inputFields), new SumBy(inputFieldName, targetClazz), new Fields(outputField));
+    }
+
+    private String getStageName(RelNode n) {
+        return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+    }
+
+    private Class<?> getAggrCallReturnJavaType(AggregateCall call) {
+        return toClassWithBoxing(typeFactory.getJavaClass(call.getType()));
+    }
+
+    private Class toClassWithBoxing(Type type) {
+        Class clazz = (Class<?>)type;
+        if (clazz.isPrimitive()) {
+            clazz = Primitives.wrap(clazz);
+        }
+
+        return clazz;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 8a14eee..4b95560 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.sql.compiler;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -65,6 +64,37 @@ public class TestCompilerUtils {
         Table table = streamableTable.stream();
         schema.add("FOO", table);
         schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                false,
+                Collections.<String>emptyList(), typeFactory));
+        SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+                schema).operatorTable(chainedSqlOperatorTable).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverDummyGroupByTable(String sql)
+            throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+                (RelDataTypeSystem.DEFAULT);
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+                .field("ID", SqlTypeName.INTEGER)
+                .field("GRPID", SqlTypeName.INTEGER)
+                .field("NAME", typeFactory.createType(String.class))
+                .field("ADDR", typeFactory.createType(String.class))
+                .field("AGE", SqlTypeName.INTEGER)
+                .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
         FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
                 schema).build();
         Planner planner = Frameworks.getPlanner(config);

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index 017aa25..ae95fcc 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -64,7 +64,7 @@ public class TestExprCompiler {
       project.getChildExps().get(0).accept(compiler);
     }
 
-    assertThat(sw.toString(), containsString("(int)(_data.get(0));"));
+    assertThat(sw.toString(), containsString("(java.lang.Integer)(_data.get(0));"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
index 76eba1d..f40e138 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.containsString;
 
@@ -45,7 +46,8 @@ public class TestRelNodeCompiler {
          PrintWriter pw = new PrintWriter(sw)
     ) {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      compiler.visitFilter(filter);
+      // standalone mode doesn't use inputstreams argument
+      compiler.visitFilter(filter, Collections.EMPTY_LIST);
       pw.flush();
       Assert.assertThat(sw.toString(), containsString("> 3"));
     }
@@ -54,7 +56,8 @@ public class TestRelNodeCompiler {
          PrintWriter pw = new PrintWriter(sw)
     ) {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      compiler.visitProject(project);
+      // standalone mode doesn't use inputstreams argument
+      compiler.visitProject(project, Collections.EMPTY_LIST);
       pw.flush();
       Assert.assertThat(sw.toString(), containsString("plus("));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 0f8daa9..b1481ac 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -23,6 +23,7 @@ import org.apache.storm.Config;
 import org.apache.storm.ILocalCluster;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
@@ -39,7 +40,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.apache.storm.trident.TridentTopology;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -59,10 +62,10 @@ public class TestPlanCompiler {
     final int EXPECTED_VALUE_SIZE = 2;
     String sql = "SELECT ID FROM FOO WHERE ID > 2";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    final AbstractTridentProcessor proc = compiler.compile(state.tree());
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
@@ -71,20 +74,71 @@ public class TestPlanCompiler {
   }
 
   @Test
+  public void testCompileGroupByExp() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentGroupedDataSource());
+    String sql = "SELECT GRPID, COUNT(*) AS CNT, MAX(AGE) AS MAX_AGE, MIN(AGE) AS MIN_AGE, AVG(AGE) AS AVG_AGE, MAX(AGE) - MIN(AGE) AS DIFF FROM FOO GROUP BY GRPID";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyGroupByTable(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 5, 1, 3, 4)}, getCollectedValues().toArray());
+  }
+
+  @Test
   public void testInsert() throws Exception {
     final int EXPECTED_VALUE_SIZE = 1;
     String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
-    PlanCompiler compiler = new PlanCompiler(typeFactory);
-    final AbstractTridentProcessor proc = compiler.compile(state.tree());
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockSqlTridentDataSource());
     data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
     final TridentTopology topo = proc.build(data);
     runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
     Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray());
   }
 
+  @Test
+  public void testLogicalExpr() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(true, false, true) }, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testUdf() throws Exception {
+    int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT MYPLUS(ID, 3)" +
+            "FROM FOO " +
+            "WHERE ID = 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray());
+  }
+
   private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
                                   TridentTopology topo) throws Exception {
     final Config conf = new Config();

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
index 473283d..0f02be2 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -73,6 +73,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- janino -->
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
new file mode 100644
index 0000000..0b64fea
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/NumberConverter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident;
+
+public class NumberConverter {
+    public static Number convert(Number num, Class<? extends Number> toType) {
+        // no need to convert
+        if (num.getClass().equals(toType)) {
+            return num;
+        }
+
+        if (toType.equals(Byte.class)) {
+            return num.byteValue();
+        } else if (toType.equals(Short.class)) {
+            return num.shortValue();
+        } else if (toType.equals(Integer.class)) {
+            return num.intValue();
+        } else if (toType.equals(Long.class)) {
+            return num.longValue();
+        } else if (toType.equals(Float.class)) {
+            return num.floatValue();
+        } else if (toType.equals(Double.class)) {
+            return num.doubleValue();
+        } else if (toType.isAssignableFrom(num.getClass())) {
+            // isAssignable is true so safe to return
+            return num;
+        } else {
+            throw new IllegalArgumentException("Can't convert Number " + num + " (class " + num.getClass() + " to " + toType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
new file mode 100644
index 0000000..28c2ae1
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/TridentUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class TridentUtils {
+    private TridentUtils() {
+    }
+
+    public static <T> T valueFromTuple(TridentTuple tuple, String inputFieldName) {
+        // when there is no input field then the whole tuple is considered for comparison.
+        Object value;
+        if (tuple == null) {
+            throw new IllegalArgumentException("Tuple is null");
+        }
+        return (T) tuple.getValueByField(inputFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
new file mode 100644
index 0000000..7afa096
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFilter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+
+public class EvaluationFilter extends BaseFilter {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFilter.class);
+
+    private transient ScriptEvaluator evaluator;
+    private final String expression;
+
+    public EvaluationFilter(String expression) {
+        this.expression = expression;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        LOG.info("Expression: {}", expression);
+        try {
+            evaluator = new ScriptEvaluator(expression, Boolean.class,
+                    new String[] {"_data"}, new Class[] { List.class });
+        } catch (CompileException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean isKeep(TridentTuple tuple) {
+        try {
+            return (Boolean) evaluator.evaluate(new Object[] {tuple.getValues()});
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
new file mode 100644
index 0000000..b0bbce3
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/EvaluationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+
+public class EvaluationFunction extends BaseFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(EvaluationFunction.class);
+
+    private transient ScriptEvaluator evaluator;
+    private final String expression;
+
+    public EvaluationFunction(String expression) {
+        this.expression = expression;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        LOG.info("Expression: {}", expression);
+        try {
+            evaluator = new ScriptEvaluator(expression, Values.class,
+                    new String[] {"_data"}, new Class[] { List.class });
+        } catch (CompileException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        try {
+            collector.emit((Values) evaluator.evaluate(new Object[] {tuple.getValues()}));
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
new file mode 100644
index 0000000..4c3a266
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/functions/ForwardFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.functions;
+
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ForwardFunction extends BaseFunction {
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        collector.emit(tuple.getValues());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
new file mode 100644
index 0000000..df5d8c8
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/CountBy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class CountBy implements CombinerAggregator<Long> {
+
+    private final String inputFieldName;
+
+    public CountBy(String inputFieldName) {
+        this.inputFieldName = inputFieldName;
+    }
+
+    @Override
+    public Long init(TridentTuple tuple) {
+        // handles null field
+        if (TridentUtils.valueFromTuple(tuple, inputFieldName) == null) {
+            return 0L;
+        }
+
+        return 1L;
+    }
+
+    @Override
+    public Long combine(Long val1, Long val2) {
+        return val1 + val2;
+    }
+
+    @Override
+    public Long zero() {
+        return 0L;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
new file mode 100644
index 0000000..6dcf324
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/DivideForAverage.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+public class DivideForAverage extends BaseFunction {
+
+    private final Class<? extends Number> targetClazz;
+
+    public DivideForAverage(Class<? extends Number> targetClazz) {
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        // This expects two input fields: first one is the result of SumBy and next one is the result of CountBy
+        Number n1 = (Number)tuple.get(0);
+        // the type of result for CountBy is Long
+        Long n2 = (Long)tuple.get(1);
+
+        collector.emit(new Values(NumberConverter.convert(Numbers.divide(n1, n2), targetClazz)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/28eefbef/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
new file mode 100644
index 0000000..b3ab477
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/operations/MaxBy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.sql.runtime.trident.operations;
+
+import clojure.lang.Numbers;
+import org.apache.storm.sql.runtime.trident.NumberConverter;
+import org.apache.storm.sql.runtime.trident.TridentUtils;
+import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class MaxBy implements CombinerAggregator<Object> {
+
+    private final String inputFieldName;
+    private final Class<?> targetClazz;
+
+    public MaxBy(String inputFieldName, Class<?> targetClazz) {
+        this.inputFieldName = inputFieldName;
+        this.targetClazz = targetClazz;
+    }
+
+    @Override
+    public Object init(TridentTuple tuple) {
+        return TridentUtils.valueFromTuple(tuple, inputFieldName);
+    }
+
+    @Override
+    public Object combine(Object val1, Object val2) {
+        if (val1 == null) {
+            return val2;
+        }
+
+        if (Number.class.isAssignableFrom(targetClazz)) {
+            return NumberConverter.convert((Number) Numbers.max(val1, val2), (Class<? extends Number>) targetClazz);
+        }
+
+        if (!val1.getClass().equals(val2.getClass())) {
+            throw new IllegalArgumentException("The type of values are different! - class of val1: " +
+                    val1.getClass().getCanonicalName() + " and class of val2: " +
+                    val2.getClass().getCanonicalName());
+        } else if (!(val1 instanceof Comparable)) {
+            throw new IllegalArgumentException("Value is not Comparable - class of value: " +
+                    val1.getClass().getCanonicalName());
+        }
+
+        return ((Comparable)val1).compareTo(val2) < 0 ? val2 : val1;
+    }
+
+    @Override
+    public Object zero() {
+        return null;
+    }
+}


[3/4] storm git commit: Merge branch 'STORM-1434'

Posted by ka...@apache.org.
Merge branch 'STORM-1434'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64fb1b1a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64fb1b1a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64fb1b1a

Branch: refs/heads/master
Commit: 64fb1b1a1c09f79dbb1ed919b786e20c0d4a358d
Parents: 0b080fc 28eefbe
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 25 07:00:56 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 25 07:00:56 2016 +0900

----------------------------------------------------------------------
 external/sql/README.md                          |   6 +-
 external/sql/storm-sql-core/pom.xml             |   5 +
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  19 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  16 +-
 .../sql/compiler/PostOrderRelNodeVisitor.java   | 100 ++---
 .../backends/standalone/RelNodeCompiler.java    |  14 +-
 .../compiler/backends/trident/PlanCompiler.java | 222 +++---------
 .../backends/trident/RelNodeCompiler.java       | 116 ------
 .../trident/TridentLogicalPlanCompiler.java     | 363 +++++++++++++++++++
 .../storm/sql/compiler/TestCompilerUtils.java   |  32 +-
 .../storm/sql/compiler/TestExprCompiler.java    |   2 +-
 .../standalone/TestRelNodeCompiler.java         |   7 +-
 .../backends/trident/TestPlanCompiler.java      |  62 +++-
 external/sql/storm-sql-runtime/pom.xml          |   5 +
 .../sql/runtime/trident/NumberConverter.java    |  47 +++
 .../storm/sql/runtime/trident/TridentUtils.java |  35 ++
 .../trident/functions/EvaluationFilter.java     |  62 ++++
 .../trident/functions/EvaluationFunction.java   |  64 ++++
 .../trident/functions/ForwardFunction.java      |  30 ++
 .../sql/runtime/trident/operations/CountBy.java |  53 +++
 .../trident/operations/DivideForAverage.java    |  45 +++
 .../sql/runtime/trident/operations/MaxBy.java   |  68 ++++
 .../sql/runtime/trident/operations/MinBy.java   |  68 ++++
 .../sql/runtime/trident/operations/SumBy.java   |  60 +++
 .../test/org/apache/storm/sql/TestUtils.java    |  74 ++++
 pom.xml                                         |   7 +
 26 files changed, 1211 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/64fb1b1a/external/sql/README.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/64fb1b1a/pom.xml
----------------------------------------------------------------------