You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:09 UTC
[06/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
new file mode 100644
index 0000000..2e237c0
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.parser.ColumnConstraint;
+
+import java.util.ArrayList;
+
+import static org.apache.calcite.rel.RelFieldCollation.Direction;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
+
+public class CompilerUtil {
+ public static String escapeJavaString(String s, boolean nullMeansNull) {
+ if(s == null) {
+ return nullMeansNull ? "null" : "\"\"";
+ } else {
+ String s1 = Util.replace(s, "\\", "\\\\");
+ String s2 = Util.replace(s1, "\"", "\\\"");
+ String s3 = Util.replace(s2, "\n\r", "\\n");
+ String s4 = Util.replace(s3, "\n", "\\n");
+ String s5 = Util.replace(s4, "\r", "\\r");
+ return "\"" + s5 + "\"";
+ }
+ }
+
+ public static class TableBuilderInfo {
+ private final RelDataTypeFactory typeFactory;
+
+ public TableBuilderInfo(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ private static class FieldType {
+ private final String name;
+ private final RelDataType relDataType;
+
+ private FieldType(String name, RelDataType relDataType) {
+ this.name = name;
+ this.relDataType = relDataType;
+ }
+
+ }
+
+ private final ArrayList<FieldType> fields = new ArrayList<>();
+ private final ArrayList<Object[]> rows = new ArrayList<>();
+ private int primaryKey = -1;
+ private Integer parallelismHint;
+ private SqlMonotonicity primaryKeyMonotonicity;
+ private Statistic stats;
+
+ public TableBuilderInfo field(String name, SqlTypeName type) {
+ return field(name, typeFactory.createSqlType(type));
+ }
+
+ public TableBuilderInfo field(String name, RelDataType type) {
+ fields.add(new FieldType(name, type));
+ return this;
+ }
+
+ public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
+ RelDataType dataType = type.deriveType(typeFactory);
+ if (constraint instanceof ColumnConstraint.PrimaryKey) {
+ ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
+ Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
+ primaryKey = fields.size();
+ primaryKeyMonotonicity = pk.monotonicity();
+ }
+ fields.add(new FieldType(name, dataType));
+ return this;
+ }
+
+ public TableBuilderInfo statistics(Statistic stats) {
+ this.stats = stats;
+ return this;
+ }
+
+ @VisibleForTesting
+ public TableBuilderInfo rows(Object[] data) {
+ rows.add(data);
+ return this;
+ }
+
+ public TableBuilderInfo parallelismHint(int parallelismHint) {
+ this.parallelismHint = parallelismHint;
+ return this;
+ }
+
+ public StreamableTable build() {
+ final Statistic stat = buildStatistic();
+ final Table tbl = new Table() {
+ @Override
+ public RelDataType getRowType(
+ RelDataTypeFactory relDataTypeFactory) {
+ RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
+ for (FieldType f : fields) {
+ b.add(f.name, f.relDataType);
+ }
+ return b.build();
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return stat != null ? stat : Statistics.of(rows.size(),
+ ImmutableList.<ImmutableBitSet>of());
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.STREAM;
+ }
+ };
+
+ return new ParallelStreamableTable() {
+ @Override
+ public Integer parallelismHint() {
+ return parallelismHint;
+ }
+
+ @Override
+ public Table stream() {
+ return tbl;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+ return tbl.getRowType(relDataTypeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return tbl.getStatistic();
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.STREAM;
+ }
+ };
+ }
+
+ private Statistic buildStatistic() {
+ if (stats != null || primaryKey == -1) {
+ return stats;
+ }
+ Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
+ RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
+ return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
+ ImmutableList.of(RelCollations.of(collation)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
new file mode 100644
index 0000000..5ac95e0
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
+ *
+ * This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is executable,
+ * we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
+ */
+public class RexNodeToJavaCodeCompiler {
+ private final RexBuilder rexBuilder;
+
+ public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
+ this.rexBuilder = rexBuilder;
+ }
+
+ public BlockStatement compileToBlock(List<RexNode> nodes, RelDataType inputRowType) {
+ final RexProgramBuilder programBuilder =
+ new RexProgramBuilder(inputRowType, rexBuilder);
+ for (RexNode node : nodes) {
+ programBuilder.addProject(node, null);
+ }
+
+ return compileToBlock(programBuilder.getProgram());
+ }
+
+ public BlockStatement compileToBlock(final RexProgram program) {
+ final ParameterExpression context_ =
+ Expressions.parameter(Context.class, "context");
+ final ParameterExpression outputValues_ =
+ Expressions.parameter(Object[].class, "outputValues");
+
+ return compileToBlock(program, context_, outputValues_).toBlock();
+ }
+
+ public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
+ final RexProgramBuilder programBuilder =
+ new RexProgramBuilder(inputRowType, rexBuilder);
+ for (RexNode node : nodes) {
+ programBuilder.addProject(node, null);
+ }
+
+ return compile(programBuilder.getProgram(), className);
+ }
+
+ public String compile(final RexProgram program, String className) {
+ final ParameterExpression context_ =
+ Expressions.parameter(Context.class, "context");
+ final ParameterExpression outputValues_ =
+ Expressions.parameter(Object[].class, "outputValues");
+
+ BlockBuilder builder = compileToBlock(program, context_, outputValues_);
+ return baz(context_, outputValues_, builder.toBlock(), className);
+ }
+
+ private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context_,
+ ParameterExpression outputValues_) {
+ RelDataType inputRowType = program.getInputRowType();
+ final BlockBuilder builder = new BlockBuilder();
+ final JavaTypeFactoryImpl javaTypeFactory =
+ new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+ final RexToLixTranslator.InputGetter inputGetter =
+ new RexToLixTranslator.InputGetterImpl(
+ ImmutableList.of(
+ Pair.<Expression, PhysType>of(
+ Expressions.field(context_,
+ BuiltInMethod.CONTEXT_VALUES.field),
+ PhysTypeImpl.of(javaTypeFactory, inputRowType,
+ JavaRowFormat.ARRAY, false))));
+ final Function1<String, RexToLixTranslator.InputGetter> correlates =
+ new Function1<String, RexToLixTranslator.InputGetter>() {
+ public RexToLixTranslator.InputGetter apply(String a0) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ final Expression root =
+ Expressions.field(context_, BuiltInMethod.CONTEXT_ROOT.field);
+ final List<Expression> list =
+ RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
+ null, root, inputGetter, correlates);
+ for (int i = 0; i < list.size(); i++) {
+ builder.add(
+ Expressions.statement(
+ Expressions.assign(
+ Expressions.arrayIndex(outputValues_,
+ Expressions.constant(i)),
+ list.get(i))));
+ }
+
+ return builder;
+ }
+
+ /** Given a method that implements {@link ExecutableExpression#execute(Context, Object[])},
+ * adds a bridge method that implements {@link ExecutableExpression#execute(Context)}, and
+ * compiles. */
+ static String baz(ParameterExpression context_,
+ ParameterExpression outputValues_, BlockStatement block, String className) {
+ final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+ // public void execute(Context, Object[] outputValues)
+ declarations.add(
+ Expressions.methodDecl(Modifier.PUBLIC, void.class,
+ StormBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+ ImmutableList.of(context_, outputValues_), block));
+
+ // public Object execute(Context)
+ final BlockBuilder builder = new BlockBuilder();
+ final Expression values_ = builder.append("values",
+ Expressions.newArrayBounds(Object.class, 1,
+ Expressions.constant(1)));
+ builder.add(
+ Expressions.statement(
+ Expressions.call(
+ Expressions.parameter(ExecutableExpression.class, "this"),
+ StormBuiltInMethod.EXPR_EXECUTE2.method, context_, values_)));
+ builder.add(
+ Expressions.return_(null,
+ Expressions.arrayIndex(values_, Expressions.constant(0))));
+ declarations.add(
+ Expressions.methodDecl(Modifier.PUBLIC, Object.class,
+ StormBuiltInMethod.EXPR_EXECUTE1.method.getName(),
+ ImmutableList.of(context_), builder.toBlock()));
+
+ final ClassDeclaration classDeclaration =
+ Expressions.classDecl(Modifier.PUBLIC, className, null,
+ ImmutableList.<Type>of(ExecutableExpression.class), declarations);
+
+ return Expressions.toString(Lists.newArrayList(classDeclaration), "\n", false);
+ }
+
+ enum StormBuiltInMethod {
+ EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
+ EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
+
+ public final Method method;
+ public final Constructor constructor;
+ public final Field field;
+
+ public static final ImmutableMap<Method, BuiltInMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, BuiltInMethod> builder =
+ ImmutableMap.builder();
+ for (BuiltInMethod value : BuiltInMethod.values()) {
+ if (value.method != null) {
+ builder.put(value.method, value);
+ }
+ }
+ MAP = builder.build();
+ }
+
+ private StormBuiltInMethod(Method method, Constructor constructor, Field field) {
+ this.method = method;
+ this.constructor = constructor;
+ this.field = field;
+ }
+
+ /**
+ * Defines a method.
+ */
+ StormBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this(Types.lookupMethod(clazz, methodName, argumentTypes), null, null);
+ }
+
+ /**
+ * Defines a constructor.
+ */
+ StormBuiltInMethod(Class clazz, Class... argumentTypes) {
+ this(null, Types.lookupConstructor(clazz, argumentTypes), null);
+ }
+
+ /**
+ * Defines a field.
+ */
+ StormBuiltInMethod(Class clazz, String fieldName, boolean dummy) {
+ this(null, null, Types.lookupField(clazz, fieldName));
+ assert dummy : "dummy value for method overloading must be true";
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
new file mode 100644
index 0000000..21ca063
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl {
+
+ public StormSqlTypeFactoryImpl() {
+ }
+
+ public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+ super(typeSystem);
+ }
+
+ @Override
+ public RelDataType toSql(RelDataType type) {
+ if (type instanceof JavaType) {
+ JavaType javaType = (JavaType) type;
+ SqlTypeName sqlTypeName = JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass());
+ if (sqlTypeName == null) {
+ sqlTypeName = SqlTypeName.ANY;
+ }
+ return createTypeWithNullability(createSqlType(sqlTypeName), type.isNullable());
+ }
+ return super.toSql(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
new file mode 100644
index 0000000..9dc4ba8
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/BuiltinAggregateFunctions.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.storm.tuple.Values;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Built-in implementations for some of the standard aggregation operations.
+ * Aggregations can be implemented as a class with the following methods viz. init, add and result.
+ * The class could contain only static methods, only non-static methods or be generic.
+ */
+public class BuiltinAggregateFunctions {
+ // binds the type information and the class implementing the aggregation
+ public static class TypeClass {
+ public static class GenericType {
+ }
+
+ public final Type ty;
+ public final Class<?> clazz;
+
+ private TypeClass(Type ty, Class<?> clazz) {
+ this.ty = ty;
+ this.clazz = clazz;
+ }
+
+ static TypeClass of(Type ty, Class<?> clazz) {
+ return new TypeClass(ty, clazz);
+ }
+ }
+
+ static final Map<String, List<TypeClass>> TABLE = new HashMap<>();
+
+ public static class ByteSum {
+ public static Byte init() {
+ return 0;
+ }
+
+ public static Byte add(Byte accumulator, Byte val) {
+ return (byte) (accumulator + val);
+ }
+
+ public static Byte result(Byte accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class ShortSum {
+ public static Short init() {
+ return 0;
+ }
+
+ public static Short add(Short accumulator, Short val) {
+ return (short) (accumulator + val);
+ }
+
+ public static Short result(Short accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class IntSum {
+ public static Integer init() {
+ return 0;
+ }
+
+ public static Integer add(Integer accumulator, Integer val) {
+ return accumulator + val;
+ }
+
+ public static Integer result(Integer accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class LongSum {
+ public static Long init() {
+ return 0L;
+ }
+
+ public static Long add(Long accumulator, Long val) {
+ return accumulator + val;
+ }
+
+ public static Long result(Long accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class FloatSum {
+ public static Float init() {
+ return 0.0f;
+ }
+
+ public static Float add(Float accumulator, Float val) {
+ return accumulator + val;
+ }
+
+ public static Float result(Float accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class DoubleSum {
+ public static Double init() {
+ return 0.0;
+ }
+
+ public static Double add(Double accumulator, Double val) {
+ return accumulator + val;
+ }
+
+ public static Double result(Double accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class Max<T extends Comparable<T>> {
+ public T init() {
+ return null;
+ }
+
+ public T add(T accumulator, T val) {
+ return (accumulator == null || accumulator.compareTo(val) < 0) ? val : accumulator;
+ }
+
+ public T result(T accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class Min<T extends Comparable<T>> {
+ public T init() {
+ return null;
+ }
+
+ public T add(T accumulator, T val) {
+ return (accumulator == null || accumulator.compareTo(val) > 0) ? val : accumulator;
+ }
+
+ public T result(T accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class IntAvg {
+ private int count;
+
+ public Integer init() {
+ return 0;
+ }
+
+ public Integer add(Integer accumulator, Integer val) {
+ ++count;
+ return accumulator + val;
+ }
+
+ public Integer result(Integer accumulator) {
+ Integer result = accumulator / count;
+ count = 0;
+ return result;
+ }
+ }
+
+ public static class DoubleAvg {
+ private int count;
+
+ public Double init() {
+ return 0.0;
+ }
+
+ public Double add(Double accumulator, Double val) {
+ ++count;
+ return accumulator + val;
+ }
+
+ public Double result(Double accumulator) {
+ Double result = accumulator / count;
+ count = 0;
+ return result;
+ }
+ }
+
+ public static class Count {
+ public static Long init() {
+ return 0L;
+ }
+
+ public static Long add(Long accumulator, Values vals) {
+ for (Object val : vals) {
+ if (val == null) {
+ return accumulator;
+ }
+ }
+ return accumulator + 1;
+ }
+
+ public static Long result(Long accumulator) {
+ return accumulator;
+ }
+ }
+
+ static {
+ TABLE.put("SUM", ImmutableList.of(
+ TypeClass.of(float.class, FloatSum.class),
+ TypeClass.of(double.class, DoubleSum.class),
+ TypeClass.of(byte.class, ByteSum.class),
+ TypeClass.of(short.class, ShortSum.class),
+ TypeClass.of(long.class, LongSum.class),
+ TypeClass.of(int.class, IntSum.class)));
+ TABLE.put("AVG", ImmutableList.of(
+ TypeClass.of(double.class, DoubleAvg.class),
+ TypeClass.of(int.class, IntAvg.class)));
+ TABLE.put("COUNT", ImmutableList.of(TypeClass.of(long.class, Count.class)));
+ TABLE.put("MAX", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Max.class)));
+ TABLE.put("MIN", ImmutableList.of(TypeClass.of(TypeClass.GenericType.class, Min.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
new file mode 100644
index 0000000..01546ed
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+public class PlanCompiler {
+ private static final Logger LOG = LoggerFactory.getLogger(PlanCompiler.class);
+
+ private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+ private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+ private static final String PROLOGUE = NEW_LINE_JOINER.join(
+ "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+ "import java.util.Iterator;", "import java.util.Map;", "import java.util.HashMap;",
+ "import java.util.List;", "import java.util.ArrayList;",
+ "import java.util.LinkedHashMap;",
+ "import org.apache.storm.tuple.Values;",
+ "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
+ "import org.apache.storm.sql.runtime.Channels;",
+ "import org.apache.storm.sql.runtime.ChannelContext;",
+ "import org.apache.storm.sql.runtime.ChannelHandler;",
+ "import org.apache.storm.sql.runtime.DataSource;",
+ "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
+ "import com.google.common.collect.ArrayListMultimap;",
+ "import com.google.common.collect.Multimap;",
+ "import org.apache.calcite.interpreter.Context;",
+ "import org.apache.calcite.interpreter.StormContext;",
+ "import org.apache.calcite.DataContext;",
+ "import org.apache.storm.sql.runtime.calcite.StormDataContext;",
+ "public final class Processor extends AbstractValuesProcessor {",
+ " public final static DataContext dataContext = new StormDataContext();",
+ "");
+ private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+ " @Override",
+ " public void initialize(Map<String, DataSource> data,",
+ " ChannelHandler result) {",
+ " ChannelContext r = Channels.chain(Channels.voidContext(), result);",
+ ""
+ );
+
+ private final JavaTypeFactory typeFactory;
+
+ public PlanCompiler(JavaTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ private String generateJavaSource(RelNode root) throws Exception {
+ StringWriter sw = new StringWriter();
+ try (PrintWriter pw = new PrintWriter(sw)) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ printPrologue(pw);
+ compiler.traverse(root);
+ printMain(pw, root);
+ printEpilogue(pw);
+ }
+ return sw.toString();
+ }
+
+ private void printMain(PrintWriter pw, RelNode root) {
+ Set<TableScan> tables = new HashSet<>();
+ pw.print(INITIALIZER_PROLOGUE);
+ chainOperators(pw, root, tables);
+ for (TableScan n : tables) {
+ String escaped = CompilerUtil.escapeJavaString(
+ Joiner.on('.').join(n.getTable().getQualifiedName()), true);
+ String r = NEW_LINE_JOINER.join(
+ " if (!data.containsKey(%1$s))",
+ " throw new RuntimeException(\"Cannot find table \" + %1$s);",
+ " data.get(%1$s).open(CTX_%2$d);",
+ "");
+ pw.print(String.format(r, escaped, n.getId()));
+ }
+ pw.print(" }\n");
+ }
+
+ private void chainOperators(PrintWriter pw, RelNode root, Set<TableScan> tables) {
+ doChainOperators(pw, root, tables, "r");
+ }
+
+ private void doChainOperators(PrintWriter pw, RelNode node, Set<TableScan> tables, String parentCtx) {
+ pw.print(
+ String.format(" ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
+ node.getId(), parentCtx, RelNodeCompiler.getStageName(node)));
+ String currentCtx = String.format("CTX_%d", node.getId());
+ if (node instanceof TableScan) {
+ tables.add((TableScan) node);
+ }
+ for (RelNode i : node.getInputs()) {
+ doChainOperators(pw, i, tables, currentCtx);
+ }
+ }
+
+ public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+ String javaCode = generateJavaSource(plan);
+ LOG.debug("Compiling... source code {}", javaCode);
+ ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+ PACKAGE_NAME + ".Processor",
+ javaCode, null);
+ return (AbstractValuesProcessor) cl.loadClass(
+ PACKAGE_NAME + ".Processor").newInstance();
+ }
+
+ private static void printEpilogue(
+ PrintWriter pw) throws Exception {
+ pw.print("}\n");
+ }
+
+ private static void printPrologue(PrintWriter pw) {
+ pw.append(PROLOGUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
new file mode 100644
index 0000000..afed8a9
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PostOrderRelNodeVisitor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.*;
+import org.apache.calcite.rel.stream.Delta;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class PostOrderRelNodeVisitor<T> {
+ public final T traverse(RelNode n) throws Exception {
+ List<T> inputStreams = new ArrayList<>();
+ for (RelNode input : n.getInputs()) {
+ inputStreams.add(traverse(input));
+ }
+
+ if (n instanceof Aggregate) {
+ return visitAggregate((Aggregate) n, inputStreams);
+ } else if (n instanceof Calc) {
+ return visitCalc((Calc) n, inputStreams);
+ } else if (n instanceof Collect) {
+ return visitCollect((Collect) n, inputStreams);
+ } else if (n instanceof Correlate) {
+ return visitCorrelate((Correlate) n, inputStreams);
+ } else if (n instanceof Delta) {
+ return visitDelta((Delta) n, inputStreams);
+ } else if (n instanceof Exchange) {
+ return visitExchange((Exchange) n, inputStreams);
+ } else if (n instanceof Project) {
+ return visitProject((Project) n, inputStreams);
+ } else if (n instanceof Filter) {
+ return visitFilter((Filter) n, inputStreams);
+ } else if (n instanceof Sample) {
+ return visitSample((Sample) n, inputStreams);
+ } else if (n instanceof Sort) {
+ return visitSort((Sort) n, inputStreams);
+ } else if (n instanceof TableModify) {
+ return visitTableModify((TableModify) n, inputStreams);
+ } else if (n instanceof TableScan) {
+ return visitTableScan((TableScan) n, inputStreams);
+ } else if (n instanceof Uncollect) {
+ return visitUncollect((Uncollect) n, inputStreams);
+ } else if (n instanceof Window) {
+ return visitWindow((Window) n, inputStreams);
+ } else if (n instanceof Join) {
+ return visitJoin((Join) n, inputStreams);
+ } else {
+ return defaultValue(n, inputStreams);
+ }
+ }
+
+ public T visitAggregate(Aggregate aggregate, List<T> inputStreams) throws Exception {
+ return defaultValue(aggregate, inputStreams);
+ }
+
+ public T visitCalc(Calc calc, List<T> inputStreams) throws Exception {
+ return defaultValue(calc, inputStreams);
+ }
+
+ public T visitCollect(Collect collect, List<T> inputStreams) throws Exception {
+ return defaultValue(collect, inputStreams);
+ }
+
+ public T visitCorrelate(Correlate correlate, List<T> inputStreams) throws Exception {
+ return defaultValue(correlate, inputStreams);
+ }
+
+ public T visitDelta(Delta delta, List<T> inputStreams) throws Exception {
+ return defaultValue(delta, inputStreams);
+ }
+
+ public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
+ return defaultValue(exchange, inputStreams);
+ }
+
+ public T visitProject(Project project, List<T> inputStreams) throws Exception {
+ return defaultValue(project, inputStreams);
+ }
+
+ public T visitFilter(Filter filter, List<T> inputStreams) throws Exception {
+ return defaultValue(filter, inputStreams);
+ }
+
+ public T visitSample(Sample sample, List<T> inputStreams) throws Exception {
+ return defaultValue(sample, inputStreams);
+ }
+
+ public T visitSort(Sort sort, List<T> inputStreams) throws Exception {
+ return defaultValue(sort, inputStreams);
+ }
+
+ public T visitTableModify(TableModify modify, List<T> inputStreams) throws Exception {
+ return defaultValue(modify, inputStreams);
+ }
+
+ public T visitTableScan(TableScan scan, List<T> inputStreams) throws Exception {
+ return defaultValue(scan, inputStreams);
+ }
+
+ public T visitUncollect(Uncollect uncollect, List<T> inputStreams) throws Exception {
+ return defaultValue(uncollect, inputStreams);
+ }
+
+ public T visitWindow(Window window, List<T> inputStreams) throws Exception {
+ return defaultValue(window, inputStreams);
+ }
+
+ public T visitJoin(Join join, List<T> inputStreams) throws Exception {
+ return defaultValue(join, inputStreams);
+ }
+
+ public T defaultValue(RelNode n, List<T> inputStreams) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
new file mode 100644
index 0000000..97995c7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.AggregateFunction;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+ public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+ private final PrintWriter pw;
+ private final JavaTypeFactory typeFactory;
+ private final RexNodeToJavaCodeCompiler rexCompiler;
+
+ private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = ",
+ " new AbstractChannelHandler() {",
+ " @Override",
+ " public void dataReceived(ChannelContext ctx, Values _data) {",
+ ""
+ );
+
+ private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = ",
+ " new AbstractChannelHandler() {",
+ " private final Values EMPTY_VALUES = new Values();",
+ " private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();",
+ " private final int[] groupIndices = new int[] {%2$s};",
+ " private List<Object> getGroupValues(Values _data) {",
+ " List<Object> res = new ArrayList<>();",
+ " for (int i: groupIndices) {",
+ " res.add(_data.get(i));",
+ " }",
+ " return res;",
+ " }",
+ "",
+ " @Override",
+ " public void flush(ChannelContext ctx) {",
+ " emitAggregateResults(ctx);",
+ " super.flush(ctx);",
+ " state.clear();",
+ " }",
+ "",
+ " private void emitAggregateResults(ChannelContext ctx) {",
+ " for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {",
+ " List<Object> groupValues = entry.getKey();",
+ " Map<String, Object> accumulators = entry.getValue();",
+ " %3$s",
+ " }",
+ " }",
+ "",
+ " @Override",
+ " public void dataReceived(ChannelContext ctx, Values _data) {",
+ ""
+ );
+
+ private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = ",
+ " new AbstractChannelHandler() {",
+ " Object left = %2$s;",
+ " Object right = %3$s;",
+ " Object source = null;",
+ " List<Values> leftRows = new ArrayList<>();",
+ " List<Values> rightRows = new ArrayList<>();",
+ " boolean leftDone = false;",
+ " boolean rightDone = false;",
+ " int[] ordinals = new int[] {%4$s, %5$s};",
+ "",
+ " Multimap<Object, Values> getJoinTable(List<Values> rows, int joinIndex) {",
+ " Multimap<Object, Values> m = ArrayListMultimap.create();",
+ " for(Values v: rows) {",
+ " m.put(v.get(joinIndex), v);",
+ " }",
+ " return m;",
+ " }",
+ "",
+ " List<Values> join(Multimap<Object, Values> tab, List<Values> rows, int rowIdx, boolean rev) {",
+ " List<Values> res = new ArrayList<>();",
+ " for (Values row: rows) {",
+ " for (Values mapValue: tab.get(row.get(rowIdx))) {",
+ " if (mapValue != null) {",
+ " Values joinedRow = new Values();",
+ " if(rev) {",
+ " joinedRow.addAll(row);",
+ " joinedRow.addAll(mapValue);",
+ " } else {",
+ " joinedRow.addAll(mapValue);",
+ " joinedRow.addAll(row);",
+ " }",
+ " res.add(joinedRow);",
+ " }",
+ " }",
+ " }",
+ " return res;",
+ " }",
+ "",
+ " @Override",
+ " public void setSource(ChannelContext ctx, Object source) {",
+ " this.source = source;",
+ " }",
+ "",
+ " @Override",
+ " public void flush(ChannelContext ctx) {",
+ " if (source == left) {",
+ " leftDone = true;",
+ " } else if (source == right) {",
+ " rightDone = true;",
+ " }",
+ " if (leftDone && rightDone) {",
+ " if (leftRows.size() <= rightRows.size()) {",
+ " for(Values res: join(getJoinTable(leftRows, ordinals[0]), rightRows, ordinals[1], false)) {",
+ " ctx.emit(res);",
+ " }",
+ " } else {",
+ " for(Values res: join(getJoinTable(rightRows, ordinals[1]), leftRows, ordinals[0], true)) {",
+ " ctx.emit(res);",
+ " }",
+ " }",
+ " leftDone = rightDone = false;",
+ " leftRows.clear();",
+ " rightRows.clear();",
+ " super.flush(ctx);",
+ " }",
+ " }",
+ "",
+ " @Override",
+ " public void dataReceived(ChannelContext ctx, Values _data) {",
+ ""
+ );
+
+ private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
+ "");
+
+ private static final String STAGE_ENUMERABLE_TABLE_SCAN = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = new AbstractChannelHandler() {",
+ " @Override",
+ " public void flush(ChannelContext ctx) {",
+ " ctx.setSource(this);",
+ " super.flush(ctx);",
+ " }",
+ "",
+ " @Override",
+ " public void dataReceived(ChannelContext ctx, Values _data) {",
+ " ctx.setSource(this);",
+ " ctx.emit(_data);",
+ " }",
+ " };",
+ "");
+
+ private int nameCount;
+ private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>();
+
+ RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+ this.pw = pw;
+ this.typeFactory = typeFactory;
+ this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory));
+ }
+
+ @Override
+ public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
+ pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
+ return null;
+ }
+
+ @Override
+ public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
+ beginStage(filter);
+
+ List<RexNode> childExps = filter.getChildExps();
+ RelDataType inputRowType = filter.getInput(0).getRowType();
+
+ pw.print("Context context = new StormContext(Processor.dataContext);\n");
+ pw.print("context.values = _data.toArray();\n");
+ pw.print("Object[] outputValues = new Object[1];\n");
+
+ pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
+
+ String r = "((Boolean) outputValues[0])";
+ if (filter.getCondition().getType().isNullable()) {
+ pw.print(String.format(" if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
+ } else {
+ pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r, r));
+ }
+ endStage();
+ return null;
+ }
+
+ @Override
+ public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
+ beginStage(project);
+
+ List<RexNode> childExps = project.getChildExps();
+ RelDataType inputRowType = project.getInput(0).getRowType();
+ int outputCount = project.getRowType().getFieldCount();
+
+ pw.print("Context context = new StormContext(Processor.dataContext);\n");
+ pw.print("context.values = _data.toArray();\n");
+ pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount));
+
+ pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
+
+ pw.print(" ctx.emit(new Values(outputValues));\n");
+ endStage();
+ return null;
+ }
+
+ @Override
+ public Void defaultValue(RelNode n, List<Void> inputStreams) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
+ pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
+ return null;
+ }
+
+ @Override
+ public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
+ beginAggregateStage(aggregate);
+ pw.println(" if (_data != null) {");
+ pw.println(" List<Object> curGroupValues = getGroupValues(_data);");
+ pw.println(" if (!state.containsKey(curGroupValues)) {");
+ pw.println(" state.put(curGroupValues, new HashMap<String, Object>());");
+ pw.println(" }");
+ pw.println(" Map<String, Object> accumulators = state.get(curGroupValues);");
+ for (AggregateCall call : aggregate.getAggCallList()) {
+ aggregate(call);
+ }
+ pw.println(" }");
+ endStage();
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(Join join, List<Void> inputStreams) {
+ beginJoinStage(join);
+ pw.println(" if (source == left) {");
+ pw.println(" leftRows.add(_data);");
+ pw.println(" } else if (source == right) {");
+ pw.println(" rightRows.add(_data);");
+ pw.println(" }");
+ endStage();
+ return null;
+ }
+
+ private String groupValueEmitStr(String var, int n) {
+ int count = 0;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ if (++count > 1) {
+ sb.append(", ");
+ }
+ sb.append(var).append(".").append("get(").append(i).append(")");
+ }
+ return sb.toString();
+ }
+
+ private String emitAggregateStmts(Aggregate aggregate) {
+ List<String> res = new ArrayList<>();
+ StringWriter sw = new StringWriter();
+ for (AggregateCall call : aggregate.getAggCallList()) {
+ res.add(aggregateResult(call, new PrintWriter(sw)));
+ }
+ return NEW_LINE_JOINER.join(sw.toString(),
+ String.format(" ctx.emit(new Values(%s, %s));",
+ groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()),
+ Joiner.on(", ").join(res)));
+ }
+
+ private String aggregateResult(AggregateCall call, PrintWriter pw) {
+ SqlAggFunction aggFunction = call.getAggregation();
+ String aggregationName = call.getAggregation().getName();
+ Type ty = typeFactory.getJavaClass(call.getType());
+ String result;
+ if (aggFunction instanceof SqlUserDefinedAggFunction) {
+ AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
+ result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, pw);
+ } else {
+ List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
+ if (typeClasses == null) {
+ throw new UnsupportedOperationException(aggregationName + " Not implemented");
+ }
+ result = doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
+ reserveAggVarName(call), ty, pw);
+ }
+ return result;
+ }
+
+ private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
+ String resultName = varName + "_result";
+ Class<?> accumulatorType = aggFn.accumulatorType;
+ Class<?> resultType = aggFn.resultType;
+ List<String> args = new ArrayList<>();
+ if (!aggFn.isStatic) {
+ String aggObjName = String.format("%s_obj", varName);
+ String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+ pw.println(" @SuppressWarnings(\"unchecked\")");
+ pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+ aggObjName));
+ args.add(aggObjName);
+ }
+ args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
+ pw.println(String.format(" final %s %s = %s;", resultType.getCanonicalName(),
+ resultName, printMethodCall(aggFn.resultMethod, args)));
+
+ return resultName;
+ }
+
+ private void aggregate(AggregateCall call) {
+ SqlAggFunction aggFunction = call.getAggregation();
+ String aggregationName = call.getAggregation().getName();
+ Type ty = typeFactory.getJavaClass(call.getType());
+ if (call.getArgList().size() != 1) {
+ if (aggregationName.equals("COUNT")) {
+ if (call.getArgList().size() != 0) {
+ throw new UnsupportedOperationException("Count with nullable fields");
+ }
+ }
+ }
+ if (aggFunction instanceof SqlUserDefinedAggFunction) {
+ AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
+ doAggregate((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, call.getArgList());
+ } else {
+ List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
+ if (typeClasses == null) {
+ throw new UnsupportedOperationException(aggregationName + " Not implemented");
+ }
+ doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
+ reserveAggVarName(call), ty, call.getArgList());
+ }
+ }
+
+ private Class<?> findMatchingClass(String aggregationName, List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) {
+ for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) {
+ if (typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || typeClass.ty.equals(ty)) {
+ return typeClass.clazz;
+ }
+ }
+ throw new UnsupportedOperationException(aggregationName + " Not implemeted for type '" + ty + "'");
+ }
+
+ private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
+ List<String> args = new ArrayList<>();
+ Class<?> accumulatorType = aggFn.accumulatorType;
+ if (!aggFn.isStatic) {
+ String aggObjName = String.format("%s_obj", varName);
+ String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
+ pw.println(String.format(" if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
+ pw.println(String.format(" accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
+ pw.println(" }");
+ pw.println(" @SuppressWarnings(\"unchecked\")");
+ pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
+ aggObjName));
+ args.add(aggObjName);
+ }
+ args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
+ "accumulators.get(\"" + varName + "\")",
+ printMethodCall(aggFn.initMethod, args),
+ accumulatorType.getCanonicalName()));
+ if (argList.isEmpty()) {
+ args.add("EMPTY_VALUES");
+ } else {
+ for (int i = 0; i < aggFn.valueTypes.size(); i++) {
+ args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
+ }
+ }
+ pw.print(String.format(" accumulators.put(\"%s\", %s);\n",
+ varName,
+ printMethodCall(aggFn.addMethod, args)));
+ }
+
+ private String reserveAggVarName(AggregateCall call) {
+ String varName;
+ if ((varName = aggregateCallVarNames.get(call)) == null) {
+ varName = call.getAggregation().getName() + ++nameCount;
+ aggregateCallVarNames.put(call, varName);
+ }
+ return varName;
+ }
+
+ private void beginStage(RelNode n) {
+ pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+ }
+
+ private void beginAggregateStage(Aggregate n) {
+ pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), getGroupByIndices(n), emitAggregateStmts(n)));
+ }
+
+ private void beginJoinStage(Join join) {
+ int[] ordinals = new int[2];
+ if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
+ throw new UnsupportedOperationException("Only simple equi joins are supported");
+ }
+
+ pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join),
+ getStageName(join.getLeft()),
+ getStageName(join.getRight()),
+ ordinals[0],
+ ordinals[1]));
+ }
+
+ private void endStage() {
+ pw.print(" }\n };\n");
+ }
+
+ static String getStageName(RelNode n) {
+ return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+ }
+
+ private String getGroupByIndices(Aggregate n) {
+ StringBuilder res = new StringBuilder();
+ int count = 0;
+ for (int i : n.getGroupSet()) {
+ if (++count > 1) {
+ res.append(", ");
+ }
+ res.append(i);
+ }
+ return res.toString();
+ }
+
+ public static String printMethodCall(Method method, List<String> args) {
+ return printMethodCall(method.getDeclaringClass(), method.getName(),
+ Modifier.isStatic(method.getModifiers()), args);
+ }
+
+ private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
+ if (isStatic) {
+ return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
+ } else {
+ return String.format("%s.%s(%s)", args.get(0), method,
+ Joiner.on(',').join(args.subList(1, args.size())));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..0b7c053
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ * "package com.foo;\n" +
+ * "public class MyClass implements Runnable {\n" +
+ * " @Override public void run() {\n" +
+ * " log(\"Hello world\");\n" +
+ * " }\n" +
+ * "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ * parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+ /**
+ * Thrown when code cannot be compiled.
+ */
+ public static class CompilerException extends Exception {
+ private static final long serialVersionUID = -2936958840023603270L;
+
+ public CompilerException(String message) {
+ super(message);
+ }
+ }
+
+ private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+ private static final URI EMPTY_URI;
+
+ static {
+ try {
+ // Needed to keep SimpleFileObject constructor happy.
+ EMPTY_URI = new URI("");
+ } catch (URISyntaxException e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * @param parent Parent classloader to resolve dependencies from.
+ * @param className Name of class to compile. eg. "com.foo.MyClass".
+ * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+ * @param diagnosticListener Notified of compiler errors (may be null).
+ */
+ public CompilingClassLoader(
+ ClassLoader parent,
+ String className,
+ String sourceCode,
+ DiagnosticListener<JavaFileObject> diagnosticListener)
+ throws CompilerException {
+ super(parent);
+ if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+ throw new CompilerException("Could not compile " + className);
+ }
+ }
+
+ public Map<String, ByteArrayOutputStream> getClasses() {
+ return byteCodeForClasses;
+ }
+
+ /**
+ * Override ClassLoader's class resolving method. Don't call this directly, instead use
+ * {@link ClassLoader#loadClass(String)}.
+ */
+ @Override
+ public Class<?> findClass(String name) throws ClassNotFoundException {
+ ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+ if (byteCode == null) {
+ throw new ClassNotFoundException(name);
+ }
+ return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+ }
+
+ /**
+ * @return Whether compilation was successful.
+ */
+ private boolean compileSourceCodeToByteCode(
+ String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+ JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+ // Set up the in-memory filesystem.
+ InMemoryFileManager fileManager =
+ new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+ JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+ // Javac option: remove these when the javac zip impl is fixed
+ // (http://b/issue?id=1822932)
+ System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+ List<String> options = new LinkedList<>();
+ // this is ignored by javac currently but useJavaUtilZip should be
+ // a valid javac XD option, which is another bug
+ options.add("-XDuseJavaUtilZip");
+
+ // Now compile!
+ JavaCompiler.CompilationTask compilationTask =
+ javaCompiler.getTask(
+ null, // Null: log any unhandled errors to stderr.
+ fileManager,
+ diagnosticListener,
+ options,
+ null,
+ singleton(javaFile));
+ return compilationTask.call();
+ }
+
+ /**
+ * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+ * any files to disk.
+ *
+ * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+ * in byteCodeForClasses.
+ *
+ * @see javax.tools.JavaFileManager
+ */
+ private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+ public InMemoryFileManager(JavaFileManager fileManager) {
+ super(fileManager);
+ }
+
+ @Override
+ public JavaFileObject getJavaFileForOutput(
+ Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+ throws IOException {
+ return new SimpleJavaFileObject(EMPTY_URI, kind) {
+ @Override
+ public OutputStream openOutputStream() throws IOException {
+ ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+ if (outputStream != null) {
+ throw new IllegalStateException("Cannot write more than once");
+ }
+ // Reasonable size for a simple .class.
+ outputStream = new ByteArrayOutputStream(256);
+ byteCodeForClasses.put(className, outputStream);
+ return outputStream;
+ }
+ };
+ }
+ }
+
+ private static class InMemoryJavaFile extends SimpleJavaFileObject {
+ private final String sourceCode;
+
+ public InMemoryJavaFile(String className, String sourceCode) {
+ super(makeUri(className), Kind.SOURCE);
+ this.sourceCode = sourceCode;
+ }
+
+ private static URI makeUri(String className) {
+ try {
+ return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e); // Not sure what could cause this.
+ }
+ }
+
+ @Override
+ public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+ return sourceCode;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
new file mode 100644
index 0000000..c67d8e7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+public class ColumnConstraint extends SqlLiteral {
+ private ColumnConstraint(
+ Object value, SqlTypeName typeName, SqlParserPos pos) {
+ super(value, typeName, pos);
+ }
+
+ public static class PrimaryKey extends ColumnConstraint {
+ private final SqlMonotonicity monotonicity;
+ public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
+ super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+ this.monotonicity = monotonicity;
+ }
+ public SqlMonotonicity monotonicity() {
+ return monotonicity;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
new file mode 100644
index 0000000..3520b86
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Arrays;
+
+public class ColumnDefinition extends SqlNodeList {
+ public ColumnDefinition(
+ SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
+ super(Arrays.asList(name, type, constraint), pos);
+ }
+
+ public String name() {
+ return get(0).toString();
+ }
+
+ public SqlDataTypeSpec type() {
+ return (SqlDataTypeSpec) get(1);
+ }
+
+ public ColumnConstraint constraint() {
+ return (ColumnConstraint) get(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..a53802c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+public class SqlCreateFunction extends SqlCall {
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+ "CREATE_FUNCTION", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(
+ SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+ assert functionQualifier == null;
+ return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], o[2]);
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ SqlCreateFunction t = (SqlCreateFunction) call;
+ UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+ u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className);
+ if (t.jarName != null) {
+ u.keyword("USING", "JAR").node(t.jarName);
+ }
+ }
+ };
+
+ private final SqlIdentifier functionName;
+ private final SqlNode className;
+ private final SqlNode jarName;
+
+ public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className, SqlNode jarName) {
+ super(pos);
+ this.functionName = functionName;
+ this.className = className;
+ this.jarName = jarName;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(functionName, className);
+ }
+
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ getOperator().unparse(writer, this, leftPrec, rightPrec);
+ }
+
+ public String functionName() {
+ return functionName.toString();
+ }
+
+ public String className() {
+ return ((NlsString)SqlLiteral.value(className)).getValue();
+ }
+
+ public String jarName() {
+ return jarName == null ? null : ((NlsString)SqlLiteral.value(jarName)).getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
new file mode 100644
index 0000000..670eedb
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+public class SqlCreateTable extends SqlCall {
+ private static final int DEFAULT_PARALLELISM = 1;
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+ "CREATE_TABLE", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(
+ SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+ assert functionQualifier == null;
+ return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
+ o[2], o[3], o[4], o[5], o[6], o[7]);
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ SqlCreateTable t = (SqlCreateTable) call;
+ UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+ u.keyword("CREATE", "EXTERNAL", "TABLE").node(t.tblName).nodeList(
+ t.fieldList);
+ if (t.inputFormatClass != null && t.outputFormatClass != null) {
+ u.keyword("STORED", "AS", "INPUTFORMAT").node(
+ t.inputFormatClass).keyword("OUTPUTFORMAT").node(
+ t.outputFormatClass);
+ }
+ u.keyword("LOCATION").node(t.location);
+ if (t.parallelism != null) {
+ u.keyword("PARALLELISM").node(t.parallelism);
+ }
+ if (t.properties != null) {
+ u.keyword("TBLPROPERTIES").node(t.properties);
+ }
+ if (t.query != null) {
+ u.keyword("AS").node(t.query);
+ }
+ }
+ };
+
+ private final SqlIdentifier tblName;
+ private final SqlNodeList fieldList;
+ private final SqlNode inputFormatClass;
+ private final SqlNode outputFormatClass;
+ private final SqlNode location;
+ private final SqlNode parallelism;
+ private final SqlNode properties;
+ private final SqlNode query;
+
+ public SqlCreateTable(
+ SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
+ SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
+ SqlNode parallelism, SqlNode properties, SqlNode query) {
+ super(pos);
+ this.tblName = tblName;
+ this.fieldList = fieldList;
+ this.inputFormatClass = inputFormatClass;
+ this.outputFormatClass = outputFormatClass;
+ this.location = location;
+ this.parallelism = parallelism;
+ this.properties = properties;
+ this.query = query;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ getOperator().unparse(writer, this, leftPrec, rightPrec);
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tblName, fieldList, inputFormatClass,
+ outputFormatClass, location, properties,
+ query);
+ }
+
+ public String tableName() {
+ return tblName.toString();
+ }
+
+ public URI location() {
+ return URI.create(getString(location));
+ }
+
+ public Integer parallelism() {
+ String parallelismStr = getString(parallelism);
+ if (parallelismStr != null) {
+ return Integer.parseInt(parallelismStr);
+ } else {
+ return DEFAULT_PARALLELISM;
+ }
+ }
+
+ public String inputFormatClass() {
+ return getString(inputFormatClass);
+ }
+
+ public String outputFormatClass() {
+ return getString(outputFormatClass);
+ }
+
+ public Properties properties() {
+ Properties props = new Properties();
+ if (properties != null) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ HashMap<String, Object> map = mapper.readValue(getString(properties), HashMap.class);
+ props.putAll(map);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return props;
+ }
+
+ private String getString(SqlNode n) {
+ return n == null ? null : SqlLiteral.stringValue(n);
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<ColumnDefinition> fieldList() {
+ return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
new file mode 100644
index 0000000..3112e53
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement
+ */
+public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
+ PRIMARY
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
new file mode 100644
index 0000000..8444e1e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.calcite.config.Lex;
+import org.apache.storm.sql.parser.impl.StormParserImpl;
+
+import java.io.StringReader;
+
+public class StormParser {
+ public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
+ private final StormParserImpl impl;
+
+ public StormParser(String s) {
+ this.impl = new StormParserImpl(new StringReader(s));
+ this.impl.setTabSize(1);
+ this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
+ this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
+ this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
+ /*
+ * By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
+ * is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
+ */
+ this.impl.switchTo("DQID");
+ }
+
+ @VisibleForTesting
+ public StormParserImpl impl() {
+ return impl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
new file mode 100644
index 0000000..8e0a1d9
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+
+class UnparseUtil {
+ private final SqlWriter writer;
+ private final int leftPrec;
+ private final int rightPrec;
+
+ UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
+ this.writer = writer;
+ this.leftPrec = leftPrec;
+ this.rightPrec = rightPrec;
+ }
+
+ UnparseUtil keyword(String... keywords) {
+ for (String k : keywords) {
+ writer.keyword(k);
+ }
+ return this;
+ }
+
+ UnparseUtil node(SqlNode n) {
+ n.unparse(writer, leftPrec, rightPrec);
+ return this;
+ }
+
+ UnparseUtil nodeList(SqlNodeList l) {
+ writer.keyword("(");
+ if (l.size() > 0) {
+ l.get(0).unparse(writer, leftPrec, rightPrec);
+ for (int i = 1; i < l.size(); ++i) {
+ writer.keyword(",");
+ l.get(i).unparse(writer, leftPrec, rightPrec);
+ }
+ }
+ writer.keyword(")");
+ return this;
+ }
+}