You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:09 UTC

[06/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
new file mode 100644
index 0000000..2e237c0
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.parser.ColumnConstraint;
+
+import java.util.ArrayList;
+
+import static org.apache.calcite.rel.RelFieldCollation.Direction;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
+
+public class CompilerUtil {
+  public static String escapeJavaString(String s, boolean nullMeansNull) {
+      if(s == null) {
+        return nullMeansNull ? "null" : "\"\"";
+      } else {
+        String s1 = Util.replace(s, "\\", "\\\\");
+        String s2 = Util.replace(s1, "\"", "\\\"");
+        String s3 = Util.replace(s2, "\n\r", "\\n");
+        String s4 = Util.replace(s3, "\n", "\\n");
+        String s5 = Util.replace(s4, "\r", "\\r");
+        return "\"" + s5 + "\"";
+      }
+  }
+
+  public static class TableBuilderInfo {
+    private final RelDataTypeFactory typeFactory;
+
+    public TableBuilderInfo(RelDataTypeFactory typeFactory) {
+      this.typeFactory = typeFactory;
+    }
+
+    private static class FieldType {
+      private final String name;
+      private final RelDataType relDataType;
+
+      private FieldType(String name, RelDataType relDataType) {
+        this.name = name;
+        this.relDataType = relDataType;
+      }
+
+    }
+
+    private final ArrayList<FieldType> fields = new ArrayList<>();
+    private final ArrayList<Object[]> rows = new ArrayList<>();
+    private int primaryKey = -1;
+    private Integer parallelismHint;
+    private SqlMonotonicity primaryKeyMonotonicity;
+    private Statistic stats;
+
+    public TableBuilderInfo field(String name, SqlTypeName type) {
+      return field(name, typeFactory.createSqlType(type));
+    }
+
+    public TableBuilderInfo field(String name, RelDataType type) {
+      fields.add(new FieldType(name, type));
+      return this;
+    }
+
+    public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
+      RelDataType dataType = type.deriveType(typeFactory);
+      if (constraint instanceof ColumnConstraint.PrimaryKey) {
+        ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
+        Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
+        primaryKey = fields.size();
+        primaryKeyMonotonicity = pk.monotonicity();
+      }
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo statistics(Statistic stats) {
+      this.stats = stats;
+      return this;
+    }
+
+    @VisibleForTesting
+    public TableBuilderInfo rows(Object[] data) {
+      rows.add(data);
+      return this;
+    }
+
+    public TableBuilderInfo parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
+    public StreamableTable build() {
+      final Statistic stat = buildStatistic();
+      final Table tbl = new Table() {
+        @Override
+        public RelDataType getRowType(
+            RelDataTypeFactory relDataTypeFactory) {
+          RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
+          for (FieldType f : fields) {
+            b.add(f.name, f.relDataType);
+          }
+          return b.build();
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return stat != null ? stat : Statistics.of(rows.size(),
+                                                     ImmutableList.<ImmutableBitSet>of());
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.STREAM;
+        }
+      };
+
+      return new ParallelStreamableTable() {
+        @Override
+        public Integer parallelismHint() {
+          return parallelismHint;
+        }
+
+        @Override
+        public Table stream() {
+          return tbl;
+        }
+
+        @Override
+        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+          return tbl.getRowType(relDataTypeFactory);
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return tbl.getStatistic();
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.STREAM;
+        }
+      };
+    }
+
+    private Statistic buildStatistic() {
+      if (stats != null || primaryKey == -1) {
+        return stats;
+      }
+      Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
+      RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
+      return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
+          ImmutableList.of(RelCollations.of(collation)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
new file mode 100644
index 0000000..5ac95e0
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
+ *
+ * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
+ * we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
+ */
+public class RexNodeToJavaCodeCompiler {
+  private final RexBuilder rexBuilder;
+
+  public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
+    this.rexBuilder = rexBuilder;
+  }
+
+  public BlockStatement compileToBlock(List<RexNode> nodes, RelDataType inputRowType) {
+    final RexProgramBuilder programBuilder =
+        new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+
+    return compileToBlock(programBuilder.getProgram());
+  }
+
+  public BlockStatement compileToBlock(final RexProgram program) {
+    final ParameterExpression context_ =
+            Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+            Expressions.parameter(Object[].class, "outputValues");
+
+    return compileToBlock(program, context_, outputValues_).toBlock();
+  }
+
+  public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
+    final RexProgramBuilder programBuilder =
+            new RexProgramBuilder(inputRowType, rexBuilder);
+    for (RexNode node : nodes) {
+      programBuilder.addProject(node, null);
+    }
+
+    return compile(programBuilder.getProgram(), className);
+  }
+
+  public String compile(final RexProgram program, String className) {
+    final ParameterExpression context_ =
+            Expressions.parameter(Context.class, "context");
+    final ParameterExpression outputValues_ =
+            Expressions.parameter(Object[].class, "outputValues");
+
+    BlockBuilder builder = compileToBlock(program, context_, outputValues_);
+    return baz(context_, outputValues_, builder.toBlock(), className);
+  }
+
+  private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
+                                        ParameterExpression outputValues_) {
+    RelDataType inputRowType = program.getInputRowType();
+    final BlockBuilder builder = new BlockBuilder();
+    final JavaTypeFactoryImpl javaTypeFactory =
+            new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+    final RexToLixTranslator.InputGetter inputGetter =
+            new RexToLixTranslator.InputGetterImpl(
+                    ImmutableList.of(
+                            Pair.<Expression, PhysType>of(
+                                    Expressions.field(context_,
+                                            BuiltInMethod.CONTEXT_VALUES.field),
+                                    PhysTypeImpl.of(javaTypeFactory, inputRowType,
+                                            JavaRowFormat.ARRAY, false))));
+    final Function1<String, RexToLixTranslator.InputGetter> correlates =
+            new Function1<String, RexToLixTranslator.InputGetter>() {
+              public RexToLixTranslator.InputGetter apply(String a0) {
+                throw new UnsupportedOperationException();
+              }
+            };
+    final Expression root =
+            Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+    final List<Expression> list =
+            RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+                    null, root, inputGetter, correlates);
+    for (int i = 0; i < list.size(); i++) {
+      builder.add(
+              Expressions.statement(
+                      Expressions.assign(
+                              Expressions.arrayIndex(outputValues_,
+                                      Expressions.constant(i)),
+                              list.get(i))));
+    }
+
+    return builder;
+  }
+
+  /** Given a method that implements {@link ExecutableExpression#execute(Context, Object[])},
+   * adds a bridge method that implements {@link ExecutableExpression#execute(Context)}, and
+   * compiles. */
+  static String baz(ParameterExpression context_,
+                    ParameterExpression outputValues_, BlockStatement block, String className) {
+    final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+    // public void execute(Context, Object[] outputValues)
+    declarations.add(
+            Expressions.methodDecl(Modifier.PUBLIC, void.class,
+                    StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+                    ImmutableList.of(context_, outputValues_), block));
+
+    // public Object execute(Context)
+    final BlockBuilder builder = new BlockBuilder();
+    final Expression values_ = builder.append("values",
+            Expressions.newArrayBounds(Object.class, 1,
+                    Expressions.constant(1)));
+    builder.add(
+            Expressions.statement(
+                    Expressions.call(
+                            Expressions.parameter(ExecutableExpression.class, "this"),
+                            StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
+    builder.add(
+            Expressions.return_(null,
+                    Expressions.arrayIndex(values_, Expressions.constant(0))));
+    declarations.add(
+            Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+                    StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
+                    ImmutableList.of(context_), builder.toBlock()));
+
+    final ClassDeclaration classDeclaration =
+            Expressions.classDecl(Modifier.PUBLIC, className, null,
+                    ImmutableList.<Type>of(ExecutableExpression.class), declarations);
+
+    return Expressions.toString(Lists.newArrayList(classDeclaration), "\n", false);
+  }
+
+  enum StormBuiltInMethod {
+    EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
+    EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
+
+    public final Method method;
+    public final Constructor constructor;
+    public final Field field;
+
+    public static final ImmutableMap<Method, BuiltInMethod> MAP;
+
+    static {
+      final ImmutableMap.Builder<Method, BuiltInMethod> builder =
+              ImmutableMap.builder();
+      for (BuiltInMethod value : BuiltInMethod.values()) {
+        if (value.method != null) {
+          builder.put(value.method, value);
+        }
+      }
+      MAP = builder.build();
+    }
+
+    private StormBuiltInMethod(Method method, Constructor constructor, Field field) {
+      this.method = method;
+      this.constructor = constructor;
+      this.field = field;
+    }
+
+    /**
+     * Defines a method.
+     */
+    StormBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+      this(Types.lookupMethod(clazz, methodName, argumentTypes), null, null);
+    }
+
+    /**
+     * Defines a constructor.
+     */
+    StormBuiltInMethod(Class clazz, Class... argumentTypes) {
+      this(null, Types.lookupConstructor(clazz, argumentTypes), null);
+    }
+
+    /**
+     * Defines a field.
+     */
+    StormBuiltInMethod(Class clazz, String fieldName, boolean dummy) {
+      this(null, null, Types.lookupField(clazz, fieldName));
+      assert dummy : "dummy value for method overloading must be true";
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
new file mode 100644
index 0000000..21ca063
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl {
+
+    public StormSqlTypeFactoryImpl() {
+    }
+
+    public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+        super(typeSystem);
+    }
+
+    @Override
+    public RelDataType toSql(RelDataType type) {
+        if (type instanceof JavaType) {
+            JavaType javaType = (JavaType) type;
+            SqlTypeName sqlTypeName = JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass());
+            if (sqlTypeName == null) {
+                sqlTypeName = SqlTypeName.ANY;
+            }
+            return createTypeWithNullability(createSqlType(sqlTypeName), type.isNullable());
+        }
+        return super.toSql(type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
new file mode 100644
index 0000000..9dc4ba8
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.storm.tuple.Values;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Built-in implementations for some of the standard aggregation operations.
+ * Aggregations can be implemented as a class with the following methods viz. init, add and result.
+ * The class could contain only static methods, only non-static methods or be generic.
+ */
+public class BuiltinAggregateFunctions {
+    // binds the type information and the class implementing the aggregation
+    public static class TypeClass {
+        public static class GenericType {
+        }
+
+        public final Type ty;
+        public final Class<?> clazz;
+
+        private TypeClass(Type ty, Class<?> clazz) {
+            this.ty = ty;
+            this.clazz = clazz;
+        }
+
+        static TypeClass of(Type ty, Class<?> clazz) {
+            return new TypeClass(ty, clazz);
+        }
+    }
+
+    static final Map<String, List<TypeClass>> TABLE = new HashMap<>();
+
+    public static class ByteSum {
+        public static Byte init() {
+            return 0;
+        }
+
+        public static Byte add(Byte accumulator, Byte val) {
+            return (byte) (accumulator + val);
+        }
+
+        public static Byte result(Byte accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class ShortSum {
+        public static Short init() {
+            return 0;
+        }
+
+        public static Short add(Short accumulator, Short val) {
+            return (short) (accumulator + val);
+        }
+
+        public static Short result(Short accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class IntSum {
+        public static Integer init() {
+            return 0;
+        }
+
+        public static Integer add(Integer accumulator, Integer val) {
+            return accumulator + val;
+        }
+
+        public static Integer result(Integer accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class LongSum {
+        public static Long init() {
+            return 0L;
+        }
+
+        public static Long add(Long accumulator, Long val) {
+            return accumulator + val;
+        }
+
+        public static Long result(Long accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class FloatSum {
+        public static Float init() {
+            return 0.0f;
+        }
+
+        public static Float add(Float accumulator, Float val) {
+            return accumulator + val;
+        }
+
+        public static Float result(Float accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class DoubleSum {
+        public static Double init() {
+            return 0.0;
+        }
+
+        public static Double add(Double accumulator, Double val) {
+            return accumulator + val;
+        }
+
+        public static Double result(Double accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class Max<T extends Comparable<T>> {
+        public T init() {
+            return null;
+        }
+
+        public T add(T accumulator, T val) {
+            return (accumulator == null || accumulator.compareTo(val) < 0) ? val : accumulator;
+        }
+
+        public T result(T accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class Min<T extends Comparable<T>> {
+        public T init() {
+            return null;
+        }
+
+        public T add(T accumulator, T val) {
+            return (accumulator == null || accumulator.compareTo(val) > 0) ? val : accumulator;
+        }
+
+        public T result(T accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class IntAvg {
+        private int count;
+
+        public Integer init() {
+            return 0;
+        }
+
+        public Integer add(Integer accumulator, Integer val) {
+            ++count;
+            return accumulator + val;
+        }
+
+        public Integer result(Integer accumulator) {
+            Integer result = accumulator / count;
+            count = 0;
+            return result;
+        }
+    }
+
+    public static class DoubleAvg {
+        private int count;
+
+        public Double init() {
+            return 0.0;
+        }
+
+        public Double add(Double accumulator, Double val) {
+            ++count;
+            return accumulator + val;
+        }
+
+        public Double result(Double accumulator) {
+            Double result = accumulator / count;
+            count = 0;
+            return result;
+        }
+    }
+
+    public static class Count {
+        public static Long init() {
+            return 0L;
+        }
+
+        public static Long add(Long accumulator, Values vals) {
+            for (Object val : vals) {
+                if (val == null) {
+                    return accumulator;
+                }
+            }
+            return accumulator + 1;
+        }
+
+        public static Long result(Long accumulator) {
+            return accumulator;
+        }
+    }
+
+    static {
+        TABLE.put("SUM", ImmutableList.of(
+                TypeClass.of(float.class, FloatSum.class),
+                TypeClass.of(double.class, DoubleSum.class),
+                TypeClass.of(byte.class, ByteSum.class),
+                TypeClass.of(short.class, ShortSum.class),
+                TypeClass.of(long.class, LongSum.class),
+                TypeClass.of(int.class, IntSum.class)));
+        TABLE.put("AVG", ImmutableList.of(
+                TypeClass.of(double.class, DoubleAvg.class),
+                TypeClass.of(int.class, IntAvg.class)));
+        TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, Count.class)));
+        TABLE.put("MAX", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class)));
+        TABLE.put("MIN", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
new file mode 100644
index 0000000..01546ed
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+public class PlanCompiler {
+  private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class);
+
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;",
+      "import java.util.List;", "import java.util.ArrayList;",
+      "import java.util.LinkedHashMap;",
+      "import org.apache.storm.tuple.Values;",
+      "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
+      "import org.apache.storm.sql.runtime.Channels;",
+      "import org.apache.storm.sql.runtime.ChannelContext;",
+      "import org.apache.storm.sql.runtime.ChannelHandler;",
+      "import org.apache.storm.sql.runtime.DataSource;",
+      "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
+      "import com.google.common.collect.ArrayListMultimap;",
+      "import com.google.common.collect.Multimap;",
+      "import org.apache.calcite.interpreter.Context;",
+      "import org.apache.calcite.interpreter.StormContext;",
+      "import org.apache.calcite.DataContext;",
+      "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
+      "public final class Processor extends AbstractValuesProcessor {",
+      "  public final static DataContext dataContext = new StormDataContext();",
+      "");
+  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+      "  @Override",
+      "  public void initialize(Map<String, DataSource> data,",
+      "                         ChannelHandler result) {",
+      "    ChannelContext r = Channels.chain(Channels.voidContext(), result);",
+      ""
+  );
+
+  private final JavaTypeFactory typeFactory;
+
+  public PlanCompiler(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printMain(pw, root);
+      printEpilogue(pw);
+    }
+    return sw.toString();
+  }
+
+  private void printMain(PrintWriter pw, RelNode root) {
+    Set<TableScan> tables = new HashSet<>();
+    pw.print(INITIALIZER_PROLOGUE);
+    chainOperators(pw, root, tables);
+    for (TableScan n : tables) {
+      String escaped = CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(n.getTable().getQualifiedName()), true);
+      String r = NEW_LINE_JOINER.join(
+          "    if (!data.containsKey(%1$s))",
+          "      throw new RuntimeException(\"Cannot find table \" + %1$s);",
+          "  data.get(%1$s).open(CTX_%2$d);",
+          "");
+      pw.print(String.format(r, escaped, n.getId()));
+    }
+    pw.print("  }\n");
+  }
+
+  private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> tables) {
+    doChainOperators(pw, root, tables, "r");
+  }
+
+  private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> tables, String parentCtx) {
+    pw.print(
+            String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
+                          node.getId(), parentCtx, RelNodeCompiler.getStageName(node)));
+    String currentCtx = String.format("CTX_%d", node.getId());
+    if (node instanceof TableScan) {
+      tables.add((TableScan) node);
+    }
+    for (RelNode i : node.getInputs()) {
+      doChainOperators(pw, i, tables, currentCtx);
+    }
+  }
+
+  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    LOG.debug("Compiling... source code {}", javaCode);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".Processor",
+                                              javaCode, null);
+    return (AbstractValuesProcessor) cl.loadClass(
+        PACKAGE_NAME + ".Processor").newInstance();
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw) throws Exception {
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
new file mode 100644
index 0000000..afed8a9
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.*;
+import org.apache.calcite.rel.stream.Delta;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class PostOrderRelNodeVisitor<T> {
+  public final T traverse(RelNode n) throws Exception {
+    List<T> inputStreams = new ArrayList<>();
+    for (RelNode input : n.getInputs()) {
+      inputStreams.add(traverse(input));
+    }
+
+    if (n instanceof Aggregate) {
+      return visitAggregate((Aggregate) n, inputStreams);
+    } else if (n instanceof Calc) {
+      return visitCalc((Calc) n, inputStreams);
+    } else if (n instanceof Collect) {
+      return visitCollect((Collect) n, inputStreams);
+    } else if (n instanceof Correlate) {
+      return visitCorrelate((Correlate) n, inputStreams);
+    } else if (n instanceof Delta) {
+      return visitDelta((Delta) n, inputStreams);
+    } else if (n instanceof Exchange) {
+      return visitExchange((Exchange) n, inputStreams);
+    } else if (n instanceof Project) {
+      return visitProject((Project) n, inputStreams);
+    } else if (n instanceof Filter) {
+      return visitFilter((Filter) n, inputStreams);
+    } else if (n instanceof Sample) {
+      return visitSample((Sample) n, inputStreams);
+    } else if (n instanceof Sort) {
+      return visitSort((Sort) n, inputStreams);
+    } else if (n instanceof TableModify) {
+      return visitTableModify((TableModify) n, inputStreams);
+    } else if (n instanceof TableScan) {
+      return visitTableScan((TableScan) n, inputStreams);
+    } else if (n instanceof Uncollect) {
+      return visitUncollect((Uncollect) n, inputStreams);
+    } else if (n instanceof Window) {
+      return visitWindow((Window) n, inputStreams);
+    } else if (n instanceof Join) {
+      return visitJoin((Join) n, inputStreams);
+    } else {
+      return defaultValue(n, inputStreams);
+    }
+  }
+
+  public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
+    return defaultValue(aggregate, inputStreams);
+  }
+
+  public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
+    return defaultValue(calc, inputStreams);
+  }
+
+  public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
+    return defaultValue(collect, inputStreams);
+  }
+
+  public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
+    return defaultValue(correlate, inputStreams);
+  }
+
+  public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
+    return defaultValue(delta, inputStreams);
+  }
+
+  public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
+    return defaultValue(exchange, inputStreams);
+  }
+
+  public T visitProject(Project project, List<T> inputStreams) throws Exception {
+    return defaultValue(project, inputStreams);
+  }
+
+  public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
+    return defaultValue(filter, inputStreams);
+  }
+
+  public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
+    return defaultValue(sample, inputStreams);
+  }
+
+  public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
+    return defaultValue(sort, inputStreams);
+  }
+
+  public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
+    return defaultValue(modify, inputStreams);
+  }
+
+  public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
+    return defaultValue(scan, inputStreams);
+  }
+
+  public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
+    return defaultValue(uncollect, inputStreams);
+  }
+
+  public T visitWindow(Window window, List<T> inputStreams) throws Exception {
+    return defaultValue(window, inputStreams);
+  }
+
+  public T visitJoin(Join join, List<T> inputStreams) throws Exception {
+    return defaultValue(join, inputStreams);
+  }
+
+  public T defaultValue(RelNode n, List<T> inputStreams) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
new file mode 100644
index 0000000..97995c7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.AggregateFunction;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+  private final RexNodeToJavaCodeCompiler rexCompiler;
+
+  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+    "  private static final ChannelHandler %1$s = ",
+    "    new AbstractChannelHandler() {",
+    "    @Override",
+    "    public void dataReceived(ChannelContext ctx, Values _data) {",
+    ""
+  );
+
+  private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+          "  private static final ChannelHandler %1$s = ",
+          "    new AbstractChannelHandler() {",
+          "    private final Values EMPTY_VALUES = new Values();",
+          "    private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();",
+          "    private final int[] groupIndices = new int[] {%2$s};",
+          "    private List<Object> getGroupValues(Values _data) {",
+          "      List<Object> res = new ArrayList<>();",
+          "      for (int i: groupIndices) {",
+          "        res.add(_data.get(i));",
+          "      }",
+          "      return res;",
+          "    }",
+          "",
+          "    @Override",
+          "    public void flush(ChannelContext ctx) {",
+          "      emitAggregateResults(ctx);",
+          "      super.flush(ctx);",
+          "      state.clear();",
+          "    }",
+          "",
+          "    private void emitAggregateResults(ChannelContext ctx) {",
+          "        for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {",
+          "          List<Object> groupValues = entry.getKey();",
+          "          Map<String, Object> accumulators = entry.getValue();",
+          "          %3$s",
+          "        }",
+          "    }",
+          "",
+          "    @Override",
+          "    public void dataReceived(ChannelContext ctx, Values _data) {",
+          ""
+  );
+
+  private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+          "  private static final ChannelHandler %1$s = ",
+          "    new AbstractChannelHandler() {",
+          "      Object left = %2$s;",
+          "      Object right = %3$s;",
+          "      Object source = null;",
+          "      List<Values> leftRows = new ArrayList<>();",
+          "      List<Values> rightRows = new ArrayList<>();",
+          "      boolean leftDone = false;",
+          "      boolean rightDone = false;",
+          "      int[] ordinals = new int[] {%4$s, %5$s};",
+          "",
+          "      Multimap<Object, Values> getJoinTable(List<Values> rows, int joinIndex) {",
+          "         Multimap<Object, Values> m = ArrayListMultimap.create();",
+          "         for(Values v: rows) {",
+          "           m.put(v.get(joinIndex), v);",
+          "         }",
+          "         return m;",
+          "      }",
+          "",
+          "      List<Values> join(Multimap<Object, Values> tab, List<Values> rows, int rowIdx, boolean rev) {",
+          "         List<Values> res = new ArrayList<>();",
+          "         for (Values row: rows) {",
+          "           for (Values mapValue: tab.get(row.get(rowIdx))) {",
+          "             if (mapValue != null) {",
+          "               Values joinedRow = new Values();",
+          "               if(rev) {",
+          "                 joinedRow.addAll(row);",
+          "                 joinedRow.addAll(mapValue);",
+          "               } else {",
+          "                 joinedRow.addAll(mapValue);",
+          "                 joinedRow.addAll(row);",
+          "               }",
+          "               res.add(joinedRow);",
+          "             }",
+          "           }",
+          "         }",
+          "         return res;",
+          "      }",
+          "",
+          "    @Override",
+          "    public void setSource(ChannelContext ctx, Object source) {",
+          "      this.source = source;",
+          "    }",
+          "",
+          "    @Override",
+          "    public void flush(ChannelContext ctx) {",
+          "        if (source == left) {",
+          "            leftDone = true;",
+          "        } else if (source == right) {",
+          "            rightDone = true;",
+          "        }",
+          "        if (leftDone && rightDone) {",
+          "          if (leftRows.size() <= rightRows.size()) {",
+          "            for(Values res: join(getJoinTable(leftRows, ordinals[0]), rightRows, ordinals[1], false)) {",
+          "              ctx.emit(res);",
+          "            }",
+          "          } else {",
+          "            for(Values res: join(getJoinTable(rightRows, ordinals[1]), leftRows, ordinals[0], true)) {",
+          "              ctx.emit(res);",
+          "            }",
+          "          }",
+          "          leftDone = rightDone = false;",
+          "          leftRows.clear();",
+          "          rightRows.clear();",
+          "          super.flush(ctx);",
+          "        }",
+          "    }",
+          "",
+          "    @Override",
+          "    public void dataReceived(ChannelContext ctx, Values _data) {",
+          ""
+  );
+
+  private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
+      "  private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
+      "");
+
+  private static final String STAGE_ENUMERABLE_TABLE_SCAN = NEW_LINE_JOINER.join(
+          "  private static final ChannelHandler %1$s = new AbstractChannelHandler() {",
+          "    @Override",
+          "    public void flush(ChannelContext ctx) {",
+          "      ctx.setSource(this);",
+          "      super.flush(ctx);",
+          "    }",
+          "",
+          "    @Override",
+          "    public void dataReceived(ChannelContext ctx, Values _data) {",
+          "      ctx.setSource(this);",
+          "      ctx.emit(_data);",
+          "    }",
+          "  };",
+          "");
+
+  private int nameCount;
+  private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>();
+
+  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+    this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory));
+  }
+
+  @Override
+  public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
+    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
+    return null;
+  }
+
+  @Override
+  public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
+    beginStage(filter);
+
+    List<RexNode> childExps = filter.getChildExps();
+    RelDataType inputRowType = filter.getInput(0).getRowType();
+
+    pw.print("Context context = new StormContext(Processor.dataContext);\n");
+    pw.print("context.values = _data.toArray();\n");
+    pw.print("Object[] outputValues = new Object[1];\n");
+
+    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
+
+    String r = "((Boolean) outputValues[0])";
+    if (filter.getCondition().getType().isNullable()) {
+      pw.print(String.format("    if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
+    } else {
+      pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r, r));
+    }
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
+    beginStage(project);
+
+    List<RexNode> childExps = project.getChildExps();
+    RelDataType inputRowType = project.getInput(0).getRowType();
+    int outputCount = project.getRowType().getFieldCount();
+
+    pw.print("Context context = new StormContext(Processor.dataContext);\n");
+    pw.print("context.values = _data.toArray();\n");
+    pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount));
+
+    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
+
+    pw.print("    ctx.emit(new Values(outputValues));\n");
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void defaultValue(RelNode n, List<Void> inputStreams) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
+    pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
+    return null;
+  }
+
+  @Override
+  public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
+    beginAggregateStage(aggregate);
+    pw.println("        if (_data != null) {");
+    pw.println("        List<Object> curGroupValues = getGroupValues(_data);");
+    pw.println("        if (!state.containsKey(curGroupValues)) {");
+    pw.println("          state.put(curGroupValues, new HashMap<String, Object>());");
+    pw.println("        }");
+    pw.println("        Map<String, Object> accumulators = state.get(curGroupValues);");
+    for (AggregateCall call : aggregate.getAggCallList()) {
+      aggregate(call);
+    }
+    pw.println("        }");
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void visitJoin(Join join, List<Void> inputStreams) {
+    beginJoinStage(join);
+    pw.println("        if (source == left) {");
+    pw.println("            leftRows.add(_data);");
+    pw.println("        } else if (source == right) {");
+    pw.println("            rightRows.add(_data);");
+    pw.println("        }");
+    endStage();
+    return null;
+  }
+
+  private String groupValueEmitStr(String var, int n) {
+    int count = 0;
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      if (++count > 1) {
+        sb.append(", ");
+      }
+      sb.append(var).append(".").append("get(").append(i).append(")");
+    }
+    return sb.toString();
+  }
+
+  private String emitAggregateStmts(Aggregate aggregate) {
+    List<String> res = new ArrayList<>();
+    StringWriter sw = new StringWriter();
+    for (AggregateCall call : aggregate.getAggCallList()) {
+      res.add(aggregateResult(call, new PrintWriter(sw)));
+    }
+    return NEW_LINE_JOINER.join(sw.toString(),
+                                String.format("          ctx.emit(new Values(%s, %s));",
+                                              groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()),
+                                              Joiner.on(", ").join(res)));
+  }
+
+  private String aggregateResult(AggregateCall call, PrintWriter pw) {
+    SqlAggFunction aggFunction = call.getAggregation();
+    String aggregationName = call.getAggregation().getName();
+    Type ty = typeFactory.getJavaClass(call.getType());
+    String result;
+    if (aggFunction instanceof SqlUserDefinedAggFunction) {
+      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
+      result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, pw);
+    } else {
+      List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
+      if (typeClasses == null) {
+        throw new UnsupportedOperationException(aggregationName + " Not implemented");
+      }
+      result = doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
+                                 reserveAggVarName(call), ty, pw);
+    }
+    return result;
+  }
+
+  private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
+    String resultName = varName + "_result";
+    Class<?> accumulatorType = aggFn.accumulatorType;
+    Class<?> resultType = aggFn.resultType;
+    List<String> args = new ArrayList<>();
+    if (!aggFn.isStatic) {
+      String aggObjName = String.format("%s_obj", varName);
+      String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+      pw.println("          @SuppressWarnings(\"unchecked\")");
+      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+              aggObjName));
+      args.add(aggObjName);
+    }
+    args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
+    pw.println(String.format("          final %s %s = %s;", resultType.getCanonicalName(),
+                             resultName, printMethodCall(aggFn.resultMethod, args)));
+
+    return resultName;
+  }
+
+  private void aggregate(AggregateCall call) {
+    SqlAggFunction aggFunction = call.getAggregation();
+    String aggregationName = call.getAggregation().getName();
+    Type ty = typeFactory.getJavaClass(call.getType());
+    if (call.getArgList().size() != 1) {
+      if (aggregationName.equals("COUNT")) {
+        if (call.getArgList().size() != 0) {
+          throw new UnsupportedOperationException("Count with nullable fields");
+        }
+      }
+    }
+    if (aggFunction instanceof SqlUserDefinedAggFunction) {
+      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
+      doAggregate((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, call.getArgList());
+    } else {
+      List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
+      if (typeClasses == null) {
+        throw new UnsupportedOperationException(aggregationName + " Not implemented");
+      }
+      doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
+                  reserveAggVarName(call), ty, call.getArgList());
+    }
+  }
+
+  private Class<?> findMatchingClass(String aggregationName, List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) {
+    for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) {
+      if (typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || typeClass.ty.equals(ty)) {
+        return typeClass.clazz;
+      }
+    }
+    throw new UnsupportedOperationException(aggregationName + " Not implemeted for type '" + ty + "'");
+  }
+
+  private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
+    List<String> args = new ArrayList<>();
+    Class<?> accumulatorType = aggFn.accumulatorType;
+    if (!aggFn.isStatic) {
+      String aggObjName = String.format("%s_obj", varName);
+      String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+      pw.println(String.format("          if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
+      pw.println(String.format("            accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
+      pw.println("          }");
+      pw.println("          @SuppressWarnings(\"unchecked\")");
+      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+              aggObjName));
+      args.add(aggObjName);
+    }
+    args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
+                           "accumulators.get(\"" + varName + "\")",
+                           printMethodCall(aggFn.initMethod, args),
+                           accumulatorType.getCanonicalName()));
+    if (argList.isEmpty()) {
+      args.add("EMPTY_VALUES");
+    } else {
+      for (int i = 0; i < aggFn.valueTypes.size(); i++) {
+        args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
+      }
+    }
+    pw.print(String.format("          accumulators.put(\"%s\", %s);\n",
+                           varName,
+                           printMethodCall(aggFn.addMethod, args)));
+  }
+
+  private String reserveAggVarName(AggregateCall call) {
+    String varName;
+    if ((varName = aggregateCallVarNames.get(call)) == null) {
+      varName = call.getAggregation().getName() + ++nameCount;
+      aggregateCallVarNames.put(call, varName);
+    }
+    return varName;
+  }
+
+  private void beginStage(RelNode n) {
+    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+  }
+
+  private void beginAggregateStage(Aggregate n) {
+    pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), getGroupByIndices(n), emitAggregateStmts(n)));
+  }
+
+  private void beginJoinStage(Join join) {
+    int[] ordinals = new int[2];
+    if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
+      throw new UnsupportedOperationException("Only simple equi joins are supported");
+    }
+
+    pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join),
+                           getStageName(join.getLeft()),
+                           getStageName(join.getRight()),
+                           ordinals[0],
+                           ordinals[1]));
+  }
+
+  private void endStage() {
+    pw.print("  }\n  };\n");
+  }
+
+  static String getStageName(RelNode n) {
+    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+  }
+
+  private String getGroupByIndices(Aggregate n) {
+    StringBuilder res = new StringBuilder();
+    int count = 0;
+    for (int i : n.getGroupSet()) {
+      if (++count > 1) {
+        res.append(", ");
+      }
+      res.append(i);
+    }
+    return res.toString();
+  }
+
+  public static String printMethodCall(Method method, List<String> args) {
+    return printMethodCall(method.getDeclaringClass(), method.getName(),
+            Modifier.isStatic(method.getModifiers()), args);
+  }
+
+  private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
+    if (isStatic) {
+      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
+    } else {
+      return String.format("%s.%s(%s)", args.get(0), method,
+              Joiner.on(',').join(args.subList(1, args.size())));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..0b7c053
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ *   "package com.foo;\n" +
+ *   "public class MyClass implements Runnable {\n" +
+ *   "  @Override public void run() {\n" +
+ *   "   log(\"Hello world\");\n" +
+ *   "  }\n" +
+ *   "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ *     parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+  /**
+   * Thrown when code cannot be compiled.
+   */
+  public static class CompilerException extends Exception {
+    private static final long serialVersionUID = -2936958840023603270L;
+
+    public CompilerException(String message) {
+      super(message);
+    }
+  }
+
+  private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+  private static final URI EMPTY_URI;
+
+  static {
+    try {
+      // Needed to keep SimpleFileObject constructor happy.
+      EMPTY_URI = new URI("");
+    } catch (URISyntaxException e) {
+      throw new Error(e);
+    }
+  }
+
+  /**
+   * @param parent Parent classloader to resolve dependencies from.
+   * @param className Name of class to compile. eg. "com.foo.MyClass".
+   * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+   * @param diagnosticListener Notified of compiler errors (may be null).
+   */
+  public CompilingClassLoader(
+      ClassLoader parent,
+      String className,
+      String sourceCode,
+      DiagnosticListener<JavaFileObject> diagnosticListener)
+      throws CompilerException {
+    super(parent);
+    if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+      throw new CompilerException("Could not compile " + className);
+    }
+  }
+
+  public Map<String, ByteArrayOutputStream> getClasses() {
+    return byteCodeForClasses;
+  }
+
+  /**
+   * Override ClassLoader's class resolving method. Don't call this directly, instead use
+   * {@link ClassLoader#loadClass(String)}.
+   */
+  @Override
+  public Class<?> findClass(String name) throws ClassNotFoundException {
+    ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+    if (byteCode == null) {
+      throw new ClassNotFoundException(name);
+    }
+    return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+  }
+
+  /**
+   * @return Whether compilation was successful.
+   */
+  private boolean compileSourceCodeToByteCode(
+      String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+    JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+    // Set up the in-memory filesystem.
+    InMemoryFileManager fileManager =
+        new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+    JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+    // Javac option: remove these when the javac zip impl is fixed
+    // (http://b/issue?id=1822932)
+    System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+    List<String> options = new LinkedList<>();
+    // this is ignored by javac currently but useJavaUtilZip should be
+    // a valid javac XD option, which is another bug
+    options.add("-XDuseJavaUtilZip");
+
+    // Now compile!
+    JavaCompiler.CompilationTask compilationTask =
+        javaCompiler.getTask(
+            null, // Null: log any unhandled errors to stderr.
+            fileManager,
+            diagnosticListener,
+            options,
+            null,
+            singleton(javaFile));
+    return compilationTask.call();
+  }
+
+  /**
+   * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+   * any files to disk.
+   *
+   * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+   * in byteCodeForClasses.
+   *
+   * @see javax.tools.JavaFileManager
+   */
+  private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+    public InMemoryFileManager(JavaFileManager fileManager) {
+      super(fileManager);
+    }
+
+    @Override
+    public JavaFileObject getJavaFileForOutput(
+        Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+        throws IOException {
+      return new SimpleJavaFileObject(EMPTY_URI, kind) {
+        @Override
+        public OutputStream openOutputStream() throws IOException {
+          ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+          if (outputStream != null) {
+            throw new IllegalStateException("Cannot write more than once");
+          }
+          // Reasonable size for a simple .class.
+          outputStream = new ByteArrayOutputStream(256);
+          byteCodeForClasses.put(className, outputStream);
+          return outputStream;
+        }
+      };
+    }
+  }
+
+  private static class InMemoryJavaFile extends SimpleJavaFileObject {
+    private final String sourceCode;
+
+    public InMemoryJavaFile(String className, String sourceCode) {
+      super(makeUri(className), Kind.SOURCE);
+      this.sourceCode = sourceCode;
+    }
+
+    private static URI makeUri(String className) {
+      try {
+        return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e); // Not sure what could cause this.
+      }
+    }
+
+    @Override
+    public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+      return sourceCode;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
new file mode 100644
index 0000000..c67d8e7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+public class ColumnConstraint extends SqlLiteral {
+  private ColumnConstraint(
+      Object value, SqlTypeName typeName, SqlParserPos pos) {
+    super(value, typeName, pos);
+  }
+
+  public static class PrimaryKey extends ColumnConstraint {
+    private final SqlMonotonicity monotonicity;
+    public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
+      super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+      this.monotonicity = monotonicity;
+    }
+    public SqlMonotonicity monotonicity() {
+      return monotonicity;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
new file mode 100644
index 0000000..3520b86
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Arrays;
+
+public class ColumnDefinition extends SqlNodeList {
+  public ColumnDefinition(
+      SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
+    super(Arrays.asList(name, type, constraint), pos);
+  }
+
+  public String name() {
+    return get(0).toString();
+  }
+
+  public SqlDataTypeSpec type() {
+    return (SqlDataTypeSpec) get(1);
+  }
+
+  public ColumnConstraint constraint() {
+    return (ColumnConstraint) get(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..a53802c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+public class SqlCreateFunction extends SqlCall {
+    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+            "CREATE_FUNCTION", SqlKind.OTHER) {
+        @Override
+        public SqlCall createCall(
+                SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+            assert functionQualifier == null;
+            return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], o[2]);
+        }
+
+        @Override
+        public void unparse(
+                SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+            SqlCreateFunction t = (SqlCreateFunction) call;
+            UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+            u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className);
+            if (t.jarName != null) {
+                u.keyword("USING", "JAR").node(t.jarName);
+            }
+        }
+    };
+
+    private final SqlIdentifier functionName;
+    private final SqlNode className;
+    private final SqlNode jarName;
+
+    public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className, SqlNode jarName) {
+        super(pos);
+        this.functionName = functionName;
+        this.className = className;
+        this.jarName = jarName;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(functionName, className);
+    }
+
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        getOperator().unparse(writer, this, leftPrec, rightPrec);
+    }
+
+    public String functionName() {
+        return functionName.toString();
+    }
+
+    public String className() {
+        return ((NlsString)SqlLiteral.value(className)).getValue();
+    }
+
+    public String jarName() {
+        return jarName == null ? null : ((NlsString)SqlLiteral.value(jarName)).getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
new file mode 100644
index 0000000..670eedb
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+public class SqlCreateTable extends SqlCall {
+  private static final int DEFAULT_PARALLELISM = 1;
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+      "CREATE_TABLE", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(
+        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+      assert functionQualifier == null;
+      return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
+                                o[2], o[3], o[4], o[5], o[6], o[7]);
+    }
+
+    @Override
+    public void unparse(
+        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+      SqlCreateTable t = (SqlCreateTable) call;
+      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+      u.keyword("CREATE", "EXTERNAL", "TABLE").node(t.tblName).nodeList(
+          t.fieldList);
+      if (t.inputFormatClass != null && t.outputFormatClass != null) {
+        u.keyword("STORED", "AS", "INPUTFORMAT").node(
+            t.inputFormatClass).keyword("OUTPUTFORMAT").node(
+            t.outputFormatClass);
+      }
+      u.keyword("LOCATION").node(t.location);
+      if (t.parallelism != null) {
+        u.keyword("PARALLELISM").node(t.parallelism);
+      }
+      if (t.properties != null) {
+        u.keyword("TBLPROPERTIES").node(t.properties);
+      }
+      if (t.query != null) {
+        u.keyword("AS").node(t.query);
+      }
+    }
+  };
+
+  private final SqlIdentifier tblName;
+  private final SqlNodeList fieldList;
+  private final SqlNode inputFormatClass;
+  private final SqlNode outputFormatClass;
+  private final SqlNode location;
+  private final SqlNode parallelism;
+  private final SqlNode properties;
+  private final SqlNode query;
+
+  public SqlCreateTable(
+          SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
+          SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
+          SqlNode parallelism, SqlNode properties, SqlNode query) {
+    super(pos);
+    this.tblName = tblName;
+    this.fieldList = fieldList;
+    this.inputFormatClass = inputFormatClass;
+    this.outputFormatClass = outputFormatClass;
+    this.location = location;
+    this.parallelism = parallelism;
+    this.properties = properties;
+    this.query = query;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    getOperator().unparse(writer, this, leftPrec, rightPrec);
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    return ImmutableNullableList.of(tblName, fieldList, inputFormatClass,
+                                    outputFormatClass, location, properties,
+                                    query);
+  }
+
+  public String tableName() {
+    return tblName.toString();
+  }
+
+  public URI location() {
+    return URI.create(getString(location));
+  }
+
+  public Integer parallelism() {
+    String parallelismStr = getString(parallelism);
+    if (parallelismStr != null) {
+      return Integer.parseInt(parallelismStr);
+    } else {
+      return DEFAULT_PARALLELISM;
+    }
+  }
+
+  public String inputFormatClass() {
+    return getString(inputFormatClass);
+  }
+
+  public String outputFormatClass() {
+    return getString(outputFormatClass);
+  }
+
+  public Properties properties() {
+    Properties props = new Properties();
+    if (properties != null) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        HashMap<String, Object> map = mapper.readValue(getString(properties), HashMap.class);
+        props.putAll(map);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return props;
+  }
+
+  private String getString(SqlNode n) {
+    return n == null ? null : SqlLiteral.stringValue(n);
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<ColumnDefinition> fieldList() {
+    return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
new file mode 100644
index 0000000..3112e53
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement
+ */
+public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
+  PRIMARY
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
new file mode 100644
index 0000000..8444e1e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.calcite.config.Lex;
+import org.apache.storm.sql.parser.impl.StormParserImpl;
+
+import java.io.StringReader;
+
+public class StormParser {
+  public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
+  private final StormParserImpl impl;
+
+  public StormParser(String s) {
+    this.impl = new StormParserImpl(new StringReader(s));
+    this.impl.setTabSize(1);
+    this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
+    this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
+    this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
+    /*
+     *  By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
+     *  is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
+     */
+    this.impl.switchTo("DQID");
+  }
+
+  @VisibleForTesting
+  public StormParserImpl impl() {
+    return impl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
new file mode 100644
index 0000000..8e0a1d9
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+
+class UnparseUtil {
+  private final SqlWriter writer;
+  private final int leftPrec;
+  private final int rightPrec;
+
+  UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
+    this.writer = writer;
+    this.leftPrec = leftPrec;
+    this.rightPrec = rightPrec;
+  }
+
+  UnparseUtil keyword(String... keywords) {
+    for (String k : keywords) {
+      writer.keyword(k);
+    }
+    return this;
+  }
+
+  UnparseUtil node(SqlNode n) {
+    n.unparse(writer, leftPrec, rightPrec);
+    return this;
+  }
+
+  UnparseUtil nodeList(SqlNodeList l) {
+    writer.keyword("(");
+    if (l.size() > 0) {
+      l.get(0).unparse(writer, leftPrec, rightPrec);
+      for (int i = 1; i < l.size(); ++i) {
+        writer.keyword(",");
+        l.get(i).unparse(writer, leftPrec, rightPrec);
+      }
+    }
+    writer.keyword(")");
+    return this;
+  }
+}