You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:19 UTC
[16/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
deleted file mode 100644
index 97995c7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rel.stream.Delta;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.AggregateFunction;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Compile RelNodes into individual functions.
- */
-class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
- public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
-
- private final PrintWriter pw;
- private final JavaTypeFactory typeFactory;
- private final RexNodeToJavaCodeCompiler rexCompiler;
-
- private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
- " private static final ChannelHandler %1$s = ",
- " new AbstractChannelHandler() {",
- " @Override",
- " public void dataReceived(ChannelContext ctx, Values _data) {",
- ""
- );
-
- private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
- " private static final ChannelHandler %1$s = ",
- " new AbstractChannelHandler() {",
- " private final Values EMPTY_VALUES = new Values();",
- " private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();",
- " private final int[] groupIndices = new int[] {%2$s};",
- " private List<Object> getGroupValues(Values _data) {",
- " List<Object> res = new ArrayList<>();",
- " for (int i: groupIndices) {",
- " res.add(_data.get(i));",
- " }",
- " return res;",
- " }",
- "",
- " @Override",
- " public void flush(ChannelContext ctx) {",
- " emitAggregateResults(ctx);",
- " super.flush(ctx);",
- " state.clear();",
- " }",
- "",
- " private void emitAggregateResults(ChannelContext ctx) {",
- " for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {",
- " List<Object> groupValues = entry.getKey();",
- " Map<String, Object> accumulators = entry.getValue();",
- " %3$s",
- " }",
- " }",
- "",
- " @Override",
- " public void dataReceived(ChannelContext ctx, Values _data) {",
- ""
- );
-
- private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
- " private static final ChannelHandler %1$s = ",
- " new AbstractChannelHandler() {",
- " Object left = %2$s;",
- " Object right = %3$s;",
- " Object source = null;",
- " List<Values> leftRows = new ArrayList<>();",
- " List<Values> rightRows = new ArrayList<>();",
- " boolean leftDone = false;",
- " boolean rightDone = false;",
- " int[] ordinals = new int[] {%4$s, %5$s};",
- "",
- " Multimap<Object, Values> getJoinTable(List<Values> rows, int joinIndex) {",
- " Multimap<Object, Values> m = ArrayListMultimap.create();",
- " for(Values v: rows) {",
- " m.put(v.get(joinIndex), v);",
- " }",
- " return m;",
- " }",
- "",
- " List<Values> join(Multimap<Object, Values> tab, List<Values> rows, int rowIdx, boolean rev) {",
- " List<Values> res = new ArrayList<>();",
- " for (Values row: rows) {",
- " for (Values mapValue: tab.get(row.get(rowIdx))) {",
- " if (mapValue != null) {",
- " Values joinedRow = new Values();",
- " if(rev) {",
- " joinedRow.addAll(row);",
- " joinedRow.addAll(mapValue);",
- " } else {",
- " joinedRow.addAll(mapValue);",
- " joinedRow.addAll(row);",
- " }",
- " res.add(joinedRow);",
- " }",
- " }",
- " }",
- " return res;",
- " }",
- "",
- " @Override",
- " public void setSource(ChannelContext ctx, Object source) {",
- " this.source = source;",
- " }",
- "",
- " @Override",
- " public void flush(ChannelContext ctx) {",
- " if (source == left) {",
- " leftDone = true;",
- " } else if (source == right) {",
- " rightDone = true;",
- " }",
- " if (leftDone && rightDone) {",
- " if (leftRows.size() <= rightRows.size()) {",
- " for(Values res: join(getJoinTable(leftRows, ordinals[0]), rightRows, ordinals[1], false)) {",
- " ctx.emit(res);",
- " }",
- " } else {",
- " for(Values res: join(getJoinTable(rightRows, ordinals[1]), leftRows, ordinals[0], true)) {",
- " ctx.emit(res);",
- " }",
- " }",
- " leftDone = rightDone = false;",
- " leftRows.clear();",
- " rightRows.clear();",
- " super.flush(ctx);",
- " }",
- " }",
- "",
- " @Override",
- " public void dataReceived(ChannelContext ctx, Values _data) {",
- ""
- );
-
- private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
- " private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
- "");
-
- private static final String STAGE_ENUMERABLE_TABLE_SCAN = NEW_LINE_JOINER.join(
- " private static final ChannelHandler %1$s = new AbstractChannelHandler() {",
- " @Override",
- " public void flush(ChannelContext ctx) {",
- " ctx.setSource(this);",
- " super.flush(ctx);",
- " }",
- "",
- " @Override",
- " public void dataReceived(ChannelContext ctx, Values _data) {",
- " ctx.setSource(this);",
- " ctx.emit(_data);",
- " }",
- " };",
- "");
-
- private int nameCount;
- private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>();
-
- RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
- this.pw = pw;
- this.typeFactory = typeFactory;
- this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory));
- }
-
- @Override
- public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
- pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
- return null;
- }
-
- @Override
- public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
- beginStage(filter);
-
- List<RexNode> childExps = filter.getChildExps();
- RelDataType inputRowType = filter.getInput(0).getRowType();
-
- pw.print("Context context = new StormContext(Processor.dataContext);\n");
- pw.print("context.values = _data.toArray();\n");
- pw.print("Object[] outputValues = new Object[1];\n");
-
- pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
- String r = "((Boolean) outputValues[0])";
- if (filter.getCondition().getType().isNullable()) {
- pw.print(String.format(" if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
- } else {
- pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r, r));
- }
- endStage();
- return null;
- }
-
- @Override
- public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
- beginStage(project);
-
- List<RexNode> childExps = project.getChildExps();
- RelDataType inputRowType = project.getInput(0).getRowType();
- int outputCount = project.getRowType().getFieldCount();
-
- pw.print("Context context = new StormContext(Processor.dataContext);\n");
- pw.print("context.values = _data.toArray();\n");
- pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount));
-
- pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
- pw.print(" ctx.emit(new Values(outputValues));\n");
- endStage();
- return null;
- }
-
- @Override
- public Void defaultValue(RelNode n, List<Void> inputStreams) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
- pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
- return null;
- }
-
- @Override
- public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
- beginAggregateStage(aggregate);
- pw.println(" if (_data != null) {");
- pw.println(" List<Object> curGroupValues = getGroupValues(_data);");
- pw.println(" if (!state.containsKey(curGroupValues)) {");
- pw.println(" state.put(curGroupValues, new HashMap<String, Object>());");
- pw.println(" }");
- pw.println(" Map<String, Object> accumulators = state.get(curGroupValues);");
- for (AggregateCall call : aggregate.getAggCallList()) {
- aggregate(call);
- }
- pw.println(" }");
- endStage();
- return null;
- }
-
- @Override
- public Void visitJoin(Join join, List<Void> inputStreams) {
- beginJoinStage(join);
- pw.println(" if (source == left) {");
- pw.println(" leftRows.add(_data);");
- pw.println(" } else if (source == right) {");
- pw.println(" rightRows.add(_data);");
- pw.println(" }");
- endStage();
- return null;
- }
-
- private String groupValueEmitStr(String var, int n) {
- int count = 0;
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < n; i++) {
- if (++count > 1) {
- sb.append(", ");
- }
- sb.append(var).append(".").append("get(").append(i).append(")");
- }
- return sb.toString();
- }
-
- private String emitAggregateStmts(Aggregate aggregate) {
- List<String> res = new ArrayList<>();
- StringWriter sw = new StringWriter();
- for (AggregateCall call : aggregate.getAggCallList()) {
- res.add(aggregateResult(call, new PrintWriter(sw)));
- }
- return NEW_LINE_JOINER.join(sw.toString(),
- String.format(" ctx.emit(new Values(%s, %s));",
- groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()),
- Joiner.on(", ").join(res)));
- }
-
- private String aggregateResult(AggregateCall call, PrintWriter pw) {
- SqlAggFunction aggFunction = call.getAggregation();
- String aggregationName = call.getAggregation().getName();
- Type ty = typeFactory.getJavaClass(call.getType());
- String result;
- if (aggFunction instanceof SqlUserDefinedAggFunction) {
- AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
- result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, pw);
- } else {
- List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
- if (typeClasses == null) {
- throw new UnsupportedOperationException(aggregationName + " Not implemented");
- }
- result = doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
- reserveAggVarName(call), ty, pw);
- }
- return result;
- }
-
- private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
- String resultName = varName + "_result";
- Class<?> accumulatorType = aggFn.accumulatorType;
- Class<?> resultType = aggFn.resultType;
- List<String> args = new ArrayList<>();
- if (!aggFn.isStatic) {
- String aggObjName = String.format("%s_obj", varName);
- String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
- pw.println(" @SuppressWarnings(\"unchecked\")");
- pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
- aggObjName));
- args.add(aggObjName);
- }
- args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
- pw.println(String.format(" final %s %s = %s;", resultType.getCanonicalName(),
- resultName, printMethodCall(aggFn.resultMethod, args)));
-
- return resultName;
- }
-
- private void aggregate(AggregateCall call) {
- SqlAggFunction aggFunction = call.getAggregation();
- String aggregationName = call.getAggregation().getName();
- Type ty = typeFactory.getJavaClass(call.getType());
- if (call.getArgList().size() != 1) {
- if (aggregationName.equals("COUNT")) {
- if (call.getArgList().size() != 0) {
- throw new UnsupportedOperationException("Count with nullable fields");
- }
- }
- }
- if (aggFunction instanceof SqlUserDefinedAggFunction) {
- AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
- doAggregate((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, call.getArgList());
- } else {
- List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
- if (typeClasses == null) {
- throw new UnsupportedOperationException(aggregationName + " Not implemented");
- }
- doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
- reserveAggVarName(call), ty, call.getArgList());
- }
- }
-
- private Class<?> findMatchingClass(String aggregationName, List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) {
- for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) {
- if (typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || typeClass.ty.equals(ty)) {
- return typeClass.clazz;
- }
- }
- throw new UnsupportedOperationException(aggregationName + " Not implemeted for type '" + ty + "'");
- }
-
- private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
- List<String> args = new ArrayList<>();
- Class<?> accumulatorType = aggFn.accumulatorType;
- if (!aggFn.isStatic) {
- String aggObjName = String.format("%s_obj", varName);
- String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
- pw.println(String.format(" if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
- pw.println(String.format(" accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
- pw.println(" }");
- pw.println(" @SuppressWarnings(\"unchecked\")");
- pw.println(String.format(" final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
- aggObjName));
- args.add(aggObjName);
- }
- args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
- "accumulators.get(\"" + varName + "\")",
- printMethodCall(aggFn.initMethod, args),
- accumulatorType.getCanonicalName()));
- if (argList.isEmpty()) {
- args.add("EMPTY_VALUES");
- } else {
- for (int i = 0; i < aggFn.valueTypes.size(); i++) {
- args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
- }
- }
- pw.print(String.format(" accumulators.put(\"%s\", %s);\n",
- varName,
- printMethodCall(aggFn.addMethod, args)));
- }
-
- private String reserveAggVarName(AggregateCall call) {
- String varName;
- if ((varName = aggregateCallVarNames.get(call)) == null) {
- varName = call.getAggregation().getName() + ++nameCount;
- aggregateCallVarNames.put(call, varName);
- }
- return varName;
- }
-
- private void beginStage(RelNode n) {
- pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
- }
-
- private void beginAggregateStage(Aggregate n) {
- pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), getGroupByIndices(n), emitAggregateStmts(n)));
- }
-
- private void beginJoinStage(Join join) {
- int[] ordinals = new int[2];
- if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
- throw new UnsupportedOperationException("Only simple equi joins are supported");
- }
-
- pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join),
- getStageName(join.getLeft()),
- getStageName(join.getRight()),
- ordinals[0],
- ordinals[1]));
- }
-
- private void endStage() {
- pw.print(" }\n };\n");
- }
-
- static String getStageName(RelNode n) {
- return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
- }
-
- private String getGroupByIndices(Aggregate n) {
- StringBuilder res = new StringBuilder();
- int count = 0;
- for (int i : n.getGroupSet()) {
- if (++count > 1) {
- res.append(", ");
- }
- res.append(i);
- }
- return res.toString();
- }
-
- public static String printMethodCall(Method method, List<String> args) {
- return printMethodCall(method.getDeclaringClass(), method.getName(),
- Modifier.isStatic(method.getModifiers()), args);
- }
-
- private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
- if (isStatic) {
- return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
- } else {
- return String.format("%s.%s(%s)", args.get(0), method,
- Joiner.on(',').join(args.subList(1, args.size())));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
deleted file mode 100644
index 0b7c053..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright (C) 2010 Google, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.javac;
-
-
-import javax.tools.DiagnosticListener;
-import javax.tools.FileObject;
-import javax.tools.ForwardingJavaFileManager;
-import javax.tools.JavaCompiler;
-import javax.tools.JavaFileManager;
-import javax.tools.JavaFileObject;
-import javax.tools.SimpleJavaFileObject;
-import javax.tools.ToolProvider;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Collections.singleton;
-
-/**
- * This is a Java ClassLoader that will attempt to load a class from a string of source code.
- *
- * <h3>Example</h3>
- *
- * <pre>
- * String className = "com.foo.MyClass";
- * String classSource =
- * "package com.foo;\n" +
- * "public class MyClass implements Runnable {\n" +
- * " @Override public void run() {\n" +
- * " log(\"Hello world\");\n" +
- * " }\n" +
- * "}";
- *
- * // Load class from source.
- * ClassLoader classLoader = new CompilingClassLoader(
- * parentClassLoader, className, classSource);
- * Class myClass = classLoader.loadClass(className);
- *
- * // Use it.
- * Runnable instance = (Runnable)myClass.newInstance();
- * instance.run();
- * </pre>
- *
- * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
- * compile more, create multiple CompilingClassLoader instances.
- *
- * Uses Java 1.6's in built compiler API.
- *
- * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
- * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
- * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
- * {#setDiagnosticListener()}.
- *
- * @see java.lang.ClassLoader
- * @see javax.tools.JavaCompiler
- */
-public class CompilingClassLoader extends ClassLoader {
-
- /**
- * Thrown when code cannot be compiled.
- */
- public static class CompilerException extends Exception {
- private static final long serialVersionUID = -2936958840023603270L;
-
- public CompilerException(String message) {
- super(message);
- }
- }
-
- private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
-
- private static final URI EMPTY_URI;
-
- static {
- try {
- // Needed to keep SimpleFileObject constructor happy.
- EMPTY_URI = new URI("");
- } catch (URISyntaxException e) {
- throw new Error(e);
- }
- }
-
- /**
- * @param parent Parent classloader to resolve dependencies from.
- * @param className Name of class to compile. eg. "com.foo.MyClass".
- * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
- * @param diagnosticListener Notified of compiler errors (may be null).
- */
- public CompilingClassLoader(
- ClassLoader parent,
- String className,
- String sourceCode,
- DiagnosticListener<JavaFileObject> diagnosticListener)
- throws CompilerException {
- super(parent);
- if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
- throw new CompilerException("Could not compile " + className);
- }
- }
-
- public Map<String, ByteArrayOutputStream> getClasses() {
- return byteCodeForClasses;
- }
-
- /**
- * Override ClassLoader's class resolving method. Don't call this directly, instead use
- * {@link ClassLoader#loadClass(String)}.
- */
- @Override
- public Class<?> findClass(String name) throws ClassNotFoundException {
- ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
- if (byteCode == null) {
- throw new ClassNotFoundException(name);
- }
- return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
- }
-
- /**
- * @return Whether compilation was successful.
- */
- private boolean compileSourceCodeToByteCode(
- String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
- JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
-
- // Set up the in-memory filesystem.
- InMemoryFileManager fileManager =
- new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
- JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
-
- // Javac option: remove these when the javac zip impl is fixed
- // (http://b/issue?id=1822932)
- System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
- List<String> options = new LinkedList<>();
- // this is ignored by javac currently but useJavaUtilZip should be
- // a valid javac XD option, which is another bug
- options.add("-XDuseJavaUtilZip");
-
- // Now compile!
- JavaCompiler.CompilationTask compilationTask =
- javaCompiler.getTask(
- null, // Null: log any unhandled errors to stderr.
- fileManager,
- diagnosticListener,
- options,
- null,
- singleton(javaFile));
- return compilationTask.call();
- }
-
- /**
- * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
- * any files to disk.
- *
- * When files are written to, rather than putting the bytes on disk, they are appended to buffers
- * in byteCodeForClasses.
- *
- * @see javax.tools.JavaFileManager
- */
- private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
- public InMemoryFileManager(JavaFileManager fileManager) {
- super(fileManager);
- }
-
- @Override
- public JavaFileObject getJavaFileForOutput(
- Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
- throws IOException {
- return new SimpleJavaFileObject(EMPTY_URI, kind) {
- @Override
- public OutputStream openOutputStream() throws IOException {
- ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
- if (outputStream != null) {
- throw new IllegalStateException("Cannot write more than once");
- }
- // Reasonable size for a simple .class.
- outputStream = new ByteArrayOutputStream(256);
- byteCodeForClasses.put(className, outputStream);
- return outputStream;
- }
- };
- }
- }
-
- private static class InMemoryJavaFile extends SimpleJavaFileObject {
- private final String sourceCode;
-
- public InMemoryJavaFile(String className, String sourceCode) {
- super(makeUri(className), Kind.SOURCE);
- this.sourceCode = sourceCode;
- }
-
- private static URI makeUri(String className) {
- try {
- return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e); // Not sure what could cause this.
- }
- }
-
- @Override
- public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
- return sourceCode;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
deleted file mode 100644
index c67d8e7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-public class ColumnConstraint extends SqlLiteral {
- private ColumnConstraint(
- Object value, SqlTypeName typeName, SqlParserPos pos) {
- super(value, typeName, pos);
- }
-
- public static class PrimaryKey extends ColumnConstraint {
- private final SqlMonotonicity monotonicity;
- public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
- super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
- this.monotonicity = monotonicity;
- }
- public SqlMonotonicity monotonicity() {
- return monotonicity;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
deleted file mode 100644
index 3520b86..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlDataTypeSpec;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.parser.SqlParserPos;
-
-import java.util.Arrays;
-
-public class ColumnDefinition extends SqlNodeList {
- public ColumnDefinition(
- SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
- super(Arrays.asList(name, type, constraint), pos);
- }
-
- public String name() {
- return get(0).toString();
- }
-
- public SqlDataTypeSpec type() {
- return (SqlDataTypeSpec) get(1);
- }
-
- public ColumnConstraint constraint() {
- return (ColumnConstraint) get(2);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
deleted file mode 100644
index a53802c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-import org.apache.calcite.util.NlsString;
-
-import java.util.List;
-
-public class SqlCreateFunction extends SqlCall {
- public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
- "CREATE_FUNCTION", SqlKind.OTHER) {
- @Override
- public SqlCall createCall(
- SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
- assert functionQualifier == null;
- return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], o[2]);
- }
-
- @Override
- public void unparse(
- SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
- SqlCreateFunction t = (SqlCreateFunction) call;
- UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
- u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className);
- if (t.jarName != null) {
- u.keyword("USING", "JAR").node(t.jarName);
- }
- }
- };
-
- private final SqlIdentifier functionName;
- private final SqlNode className;
- private final SqlNode jarName;
-
- public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className, SqlNode jarName) {
- super(pos);
- this.functionName = functionName;
- this.className = className;
- this.jarName = jarName;
- }
-
- @Override
- public SqlOperator getOperator() {
- return OPERATOR;
- }
-
- @Override
- public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(functionName, className);
- }
-
-
- @Override
- public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- getOperator().unparse(writer, this, leftPrec, rightPrec);
- }
-
- public String functionName() {
- return functionName.toString();
- }
-
- public String className() {
- return ((NlsString)SqlLiteral.value(className)).getValue();
- }
-
- public String jarName() {
- return jarName == null ? null : ((NlsString)SqlLiteral.value(jarName)).getValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
deleted file mode 100644
index 670eedb..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-public class SqlCreateTable extends SqlCall {
- private static final int DEFAULT_PARALLELISM = 1;
-
- public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
- "CREATE_TABLE", SqlKind.OTHER) {
- @Override
- public SqlCall createCall(
- SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
- assert functionQualifier == null;
- return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
- o[2], o[3], o[4], o[5], o[6], o[7]);
- }
-
- @Override
- public void unparse(
- SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
- SqlCreateTable t = (SqlCreateTable) call;
- UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
- u.keyword("CREATE", "EXTERNAL", "TABLE").node(t.tblName).nodeList(
- t.fieldList);
- if (t.inputFormatClass != null && t.outputFormatClass != null) {
- u.keyword("STORED", "AS", "INPUTFORMAT").node(
- t.inputFormatClass).keyword("OUTPUTFORMAT").node(
- t.outputFormatClass);
- }
- u.keyword("LOCATION").node(t.location);
- if (t.parallelism != null) {
- u.keyword("PARALLELISM").node(t.parallelism);
- }
- if (t.properties != null) {
- u.keyword("TBLPROPERTIES").node(t.properties);
- }
- if (t.query != null) {
- u.keyword("AS").node(t.query);
- }
- }
- };
-
- private final SqlIdentifier tblName;
- private final SqlNodeList fieldList;
- private final SqlNode inputFormatClass;
- private final SqlNode outputFormatClass;
- private final SqlNode location;
- private final SqlNode parallelism;
- private final SqlNode properties;
- private final SqlNode query;
-
- public SqlCreateTable(
- SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
- SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
- SqlNode parallelism, SqlNode properties, SqlNode query) {
- super(pos);
- this.tblName = tblName;
- this.fieldList = fieldList;
- this.inputFormatClass = inputFormatClass;
- this.outputFormatClass = outputFormatClass;
- this.location = location;
- this.parallelism = parallelism;
- this.properties = properties;
- this.query = query;
- }
-
- @Override
- public SqlOperator getOperator() {
- return OPERATOR;
- }
-
- @Override
- public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- getOperator().unparse(writer, this, leftPrec, rightPrec);
- }
-
- @Override
- public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(tblName, fieldList, inputFormatClass,
- outputFormatClass, location, properties,
- query);
- }
-
- public String tableName() {
- return tblName.toString();
- }
-
- public URI location() {
- return URI.create(getString(location));
- }
-
- public Integer parallelism() {
- String parallelismStr = getString(parallelism);
- if (parallelismStr != null) {
- return Integer.parseInt(parallelismStr);
- } else {
- return DEFAULT_PARALLELISM;
- }
- }
-
- public String inputFormatClass() {
- return getString(inputFormatClass);
- }
-
- public String outputFormatClass() {
- return getString(outputFormatClass);
- }
-
- public Properties properties() {
- Properties props = new Properties();
- if (properties != null) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- HashMap<String, Object> map = mapper.readValue(getString(properties), HashMap.class);
- props.putAll(map);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return props;
- }
-
- private String getString(SqlNode n) {
- return n == null ? null : SqlLiteral.stringValue(n);
- }
-
- @SuppressWarnings("unchecked")
- public List<ColumnDefinition> fieldList() {
- return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
deleted file mode 100644
index 3112e53..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-
-/**
- * Define the keywords that can occur in a CREATE TABLE statement
- */
-public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
- PRIMARY
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
deleted file mode 100644
index 8444e1e..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.calcite.config.Lex;
-import org.apache.storm.sql.parser.impl.StormParserImpl;
-
-import java.io.StringReader;
-
-public class StormParser {
- public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
- private final StormParserImpl impl;
-
- public StormParser(String s) {
- this.impl = new StormParserImpl(new StringReader(s));
- this.impl.setTabSize(1);
- this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
- this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
- this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
- /*
- * By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
- * is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
- */
- this.impl.switchTo("DQID");
- }
-
- @VisibleForTesting
- public StormParserImpl impl() {
- return impl;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
deleted file mode 100644
index 834fe7c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlWriter;
-
-class UnparseUtil {
- private final SqlWriter writer;
- private final int leftPrec;
- private final int rightPrec;
-
- UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
- this.writer = writer;
- this.leftPrec = leftPrec;
- this.rightPrec = rightPrec;
- }
-
- UnparseUtil keyword(String... keywords) {
- for (String k : keywords) {
- writer.keyword(k);
- }
- return this;
- }
-
- UnparseUtil node(SqlNode n) {
- n.unparse(writer, leftPrec, rightPrec);
- return this;
- }
-
- UnparseUtil nodeList(SqlNodeList l) {
- writer.keyword("(");
- if (l.size() > 0) {
- l.get(0).unparse(writer, leftPrec, rightPrec);
- for (int i = 1; i < l.size(); ++i) {
- writer.keyword(",");
- l.get(i).unparse(writer, leftPrec, rightPrec);
- }
- }
- writer.keyword(")");
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
deleted file mode 100644
index 437877c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
-
-public class StormRelDataTypeSystem extends RelDataTypeSystemImpl {
- public static final RelDataTypeSystem STORM_REL_DATATYPE_SYSTEM = new StormRelDataTypeSystem();
-
- @Override
- public int getMaxNumericScale() {
- return 38;
- }
-
- @Override
- public int getMaxNumericPrecision() {
- return 38;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
deleted file mode 100644
index 40bbacd..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class StormRelUtils {
- private static final Logger LOG = LoggerFactory.getLogger(StormRelUtils.class);
-
- private static final AtomicInteger sequence = new AtomicInteger(0);
- private static final AtomicInteger classSequence = new AtomicInteger(0);
-
- public static String getStageName(TridentRel relNode) {
- return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
- }
-
- public static String getClassName(TridentRel relNode) {
- return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
- classSequence.getAndIncrement();
- }
-
- public static TridentRel getStormRelInput(RelNode input) {
- if (input instanceof RelSubset) {
- // go with known best input
- input = ((RelSubset) input).getBest();
- }
- return (TridentRel) input;
- }
-
- public static String explain(final RelNode rel) {
- return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
- }
-
- public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
- String explain = "";
- try {
- explain = RelOptUtil.toString(rel);
- } catch (StackOverflowError e) {
- LOG.error("StackOverflowError occurred while extracting plan. Please report it to the dev@ mailing list.");
- LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
- LOG.error("Forcing plan to empty string and continue... SQL Runner may not working properly after.");
- }
- return explain;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
deleted file mode 100644
index 258fe72..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.sql.util.SqlShuttle;
-
-public class UnsupportedOperatorsVisitor extends SqlShuttle {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
deleted file mode 100644
index 1ea7912..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormCalcRelBase extends Calc implements StormRelNode {
- protected StormCalcRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
- super(cluster, traits, child, program);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
deleted file mode 100644
index 4e28460..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormFilterRelBase extends Filter implements StormRelNode {
- protected StormFilterRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
- super(cluster, traits, child, condition);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
deleted file mode 100644
index 1458ce7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.Set;
-
-public abstract class StormJoinRelBase extends Join implements StormRelNode {
- protected StormJoinRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
- super(cluster, traitSet, left, right, condition, variablesSet, joinType);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
deleted file mode 100644
index f4f23dc..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.List;
-
-public abstract class StormProjectRelBase extends Project implements StormRelNode {
- protected StormProjectRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
deleted file mode 100644
index 9327868..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.rel.RelNode;
-
-public interface StormRelNode extends RelNode {
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
deleted file mode 100644
index df47f3f..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.List;
-
-public abstract class StormStreamInsertRelBase extends TableModify implements StormRelNode {
- protected StormStreamInsertRelBase(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
deleted file mode 100644
index 36c62b2..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormStreamScanRelBase extends TableScan implements StormRelNode {
-
- // FIXME: define Table class and table.unwrap() to get it
-
- protected StormStreamScanRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
deleted file mode 100644
index f98fb02..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.storm.sql.planner.trident;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.planner.StormRelDataTypeSystem;
-import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.AbstractTridentProcessor;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class QueryPlanner {
-
- public static final int STORM_REL_CONVERSION_RULES = 1;
-
- private final Planner planner;
-
- private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- public QueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
- false,
- Collections.<String>emptyList(), typeFactory));
-
- FrameworkConfig config = Frameworks.newConfigBuilder()
- .defaultSchema(schema)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .ruleSets(TridentStormRuleSets.getRuleSets())
- .costFactory(null)
- .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
- .build();
- this.planner = Frameworks.getPlanner(config);
- }
-
- public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
- TridentRel relNode = getPlan(query);
-
- TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
- relNode.tridentPlan(tridentPlanCreator);
-
- final TridentTopology topology = tridentPlanCreator.getTopology();
- final IAggregatableStream lastStream = tridentPlanCreator.pop();
- final DataContext dc = tridentPlanCreator.getDataContext();
- final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
-
- return new AbstractTridentProcessor() {
- @Override
- public TridentTopology build() {
- return topology;
- }
-
- @Override
- public Stream outputStream() {
- return lastStream.toStream();
- }
-
- @Override
- public DataContext getDataContext() {
- return dc;
- }
-
- @Override
- public List<CompilingClassLoader> getClassLoaders() {
- return cls;
- }
- };
- }
-
- public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
- return (TridentRel) validateAndConvert(planner.parse(query));
- }
-
- private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
- SqlNode validated = validateNode(sqlNode);
- RelNode relNode = convertToRelNode(validated);
- return convertToStormRel(relNode);
- }
-
- private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
- RelTraitSet traitSet = relNode.getTraitSet();
- traitSet = traitSet.simplify();
-
- // PlannerImpl.transform() optimizes RelNode with ruleset
- return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
- }
-
- private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
- return planner.rel(sqlNode).rel;
- }
-
- private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
- SqlNode validatedSqlNode = planner.validate(sqlNode);
- validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
- return validatedSqlNode;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
deleted file mode 100644
index aa30552..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.interpreter.Scalar;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.StormDataContext;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class TridentPlanCreator {
- private final Map<String, ISqlTridentDataSource> sources;
- private final JavaTypeFactory typeFactory;
- private final RexNodeToJavaCodeCompiler rexCompiler;
- private final DataContext dataContext;
- private final TridentTopology topology;
-
- private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
- private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
-
- public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
- this.sources = sources;
- this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
- this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
-
- this.topology = new TridentTopology();
- this.dataContext = new StormDataContext();
- }
-
- public void addStream(IAggregatableStream stream) throws Exception {
- push(stream);
- }
-
- public IAggregatableStream pop() {
- return streamStack.pop();
- }
-
- public Map<String, ISqlTridentDataSource> getSources() {
- return sources;
- }
-
- public DataContext getDataContext() {
- return dataContext;
- }
-
- public JavaTypeFactory getTypeFactory() {
- return typeFactory;
- }
-
- public TridentTopology getTopology() {
- return topology;
- }
-
- public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
- throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
- String expr = rexCompiler.compile(nodes, inputRowType, className);
- CompilingClassLoader classLoader = new CompilingClassLoader(
- getLastClassLoader(), className, expr, null);
- ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
- addClassLoader(classLoader);
- return new DebuggableExecutableExpression(instance, expr);
- }
-
- public ExecutableExpression createScalarInstance(RexProgram program, String className)
- throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
- String expr = rexCompiler.compile(program, className);
- CompilingClassLoader classLoader = new CompilingClassLoader(
- getLastClassLoader(), className, expr, null);
- ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
- addClassLoader(classLoader);
- return new DebuggableExecutableExpression(instance, expr);
- }
-
- private void push(IAggregatableStream stream) {
- streamStack.push(stream);
- }
-
- public void addClassLoader(CompilingClassLoader compilingClassLoader) {
- this.classLoaders.add(compilingClassLoader);
- }
-
- public ClassLoader getLastClassLoader() {
- if (this.classLoaders.size() > 0) {
- return this.classLoaders.get(this.classLoaders.size() - 1);
- } else {
- return this.getClass().getClassLoader();
- }
- }
-
- public List<CompilingClassLoader> getClassLoaders() {
- return classLoaders;
- }
-}