You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:19 UTC

[16/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
deleted file mode 100644
index 97995c7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.compiler.backends.standalone;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rel.stream.Delta;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.AggregateFunction;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Compile RelNodes into individual functions.
- */
-class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
-  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
-
-  private final PrintWriter pw;
-  private final JavaTypeFactory typeFactory;
-  private final RexNodeToJavaCodeCompiler rexCompiler;
-
-  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-    "  private static final ChannelHandler %1$s = ",
-    "    new AbstractChannelHandler() {",
-    "    @Override",
-    "    public void dataReceived(ChannelContext ctx, Values _data) {",
-    ""
-  );
-
-  private static final String AGGREGATE_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = ",
-          "    new AbstractChannelHandler() {",
-          "    private final Values EMPTY_VALUES = new Values();",
-          "    private final Map<List<Object>, Map<String, Object>> state = new LinkedHashMap<>();",
-          "    private final int[] groupIndices = new int[] {%2$s};",
-          "    private List<Object> getGroupValues(Values _data) {",
-          "      List<Object> res = new ArrayList<>();",
-          "      for (int i: groupIndices) {",
-          "        res.add(_data.get(i));",
-          "      }",
-          "      return res;",
-          "    }",
-          "",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "      emitAggregateResults(ctx);",
-          "      super.flush(ctx);",
-          "      state.clear();",
-          "    }",
-          "",
-          "    private void emitAggregateResults(ChannelContext ctx) {",
-          "        for (Map.Entry<List<Object>, Map<String, Object>> entry: state.entrySet()) {",
-          "          List<Object> groupValues = entry.getKey();",
-          "          Map<String, Object> accumulators = entry.getValue();",
-          "          %3$s",
-          "        }",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          ""
-  );
-
-  private static final String JOIN_STAGE_PROLOGUE = NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = ",
-          "    new AbstractChannelHandler() {",
-          "      Object left = %2$s;",
-          "      Object right = %3$s;",
-          "      Object source = null;",
-          "      List<Values> leftRows = new ArrayList<>();",
-          "      List<Values> rightRows = new ArrayList<>();",
-          "      boolean leftDone = false;",
-          "      boolean rightDone = false;",
-          "      int[] ordinals = new int[] {%4$s, %5$s};",
-          "",
-          "      Multimap<Object, Values> getJoinTable(List<Values> rows, int joinIndex) {",
-          "         Multimap<Object, Values> m = ArrayListMultimap.create();",
-          "         for(Values v: rows) {",
-          "           m.put(v.get(joinIndex), v);",
-          "         }",
-          "         return m;",
-          "      }",
-          "",
-          "      List<Values> join(Multimap<Object, Values> tab, List<Values> rows, int rowIdx, boolean rev) {",
-          "         List<Values> res = new ArrayList<>();",
-          "         for (Values row: rows) {",
-          "           for (Values mapValue: tab.get(row.get(rowIdx))) {",
-          "             if (mapValue != null) {",
-          "               Values joinedRow = new Values();",
-          "               if(rev) {",
-          "                 joinedRow.addAll(row);",
-          "                 joinedRow.addAll(mapValue);",
-          "               } else {",
-          "                 joinedRow.addAll(mapValue);",
-          "                 joinedRow.addAll(row);",
-          "               }",
-          "               res.add(joinedRow);",
-          "             }",
-          "           }",
-          "         }",
-          "         return res;",
-          "      }",
-          "",
-          "    @Override",
-          "    public void setSource(ChannelContext ctx, Object source) {",
-          "      this.source = source;",
-          "    }",
-          "",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "        if (source == left) {",
-          "            leftDone = true;",
-          "        } else if (source == right) {",
-          "            rightDone = true;",
-          "        }",
-          "        if (leftDone && rightDone) {",
-          "          if (leftRows.size() <= rightRows.size()) {",
-          "            for(Values res: join(getJoinTable(leftRows, ordinals[0]), rightRows, ordinals[1], false)) {",
-          "              ctx.emit(res);",
-          "            }",
-          "          } else {",
-          "            for(Values res: join(getJoinTable(rightRows, ordinals[1]), leftRows, ordinals[0], true)) {",
-          "              ctx.emit(res);",
-          "            }",
-          "          }",
-          "          leftDone = rightDone = false;",
-          "          leftRows.clear();",
-          "          rightRows.clear();",
-          "          super.flush(ctx);",
-          "        }",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          ""
-  );
-
-  private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
-      "  private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
-      "");
-
-  private static final String STAGE_ENUMERABLE_TABLE_SCAN = NEW_LINE_JOINER.join(
-          "  private static final ChannelHandler %1$s = new AbstractChannelHandler() {",
-          "    @Override",
-          "    public void flush(ChannelContext ctx) {",
-          "      ctx.setSource(this);",
-          "      super.flush(ctx);",
-          "    }",
-          "",
-          "    @Override",
-          "    public void dataReceived(ChannelContext ctx, Values _data) {",
-          "      ctx.setSource(this);",
-          "      ctx.emit(_data);",
-          "    }",
-          "  };",
-          "");
-
-  private int nameCount;
-  private Map<AggregateCall, String> aggregateCallVarNames = new HashMap<>();
-
-  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
-    this.pw = pw;
-    this.typeFactory = typeFactory;
-    this.rexCompiler = new RexNodeToJavaCodeCompiler(new RexBuilder(typeFactory));
-  }
-
-  @Override
-  public Void visitDelta(Delta delta, List<Void> inputStreams) throws Exception {
-    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
-    return null;
-  }
-
-  @Override
-  public Void visitFilter(Filter filter, List<Void> inputStreams) throws Exception {
-    beginStage(filter);
-
-    List<RexNode> childExps = filter.getChildExps();
-    RelDataType inputRowType = filter.getInput(0).getRowType();
-
-    pw.print("Context context = new StormContext(Processor.dataContext);\n");
-    pw.print("context.values = _data.toArray();\n");
-    pw.print("Object[] outputValues = new Object[1];\n");
-
-    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
-    String r = "((Boolean) outputValues[0])";
-    if (filter.getCondition().getType().isNullable()) {
-      pw.print(String.format("    if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
-    } else {
-      pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r, r));
-    }
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitProject(Project project, List<Void> inputStreams) throws Exception {
-    beginStage(project);
-
-    List<RexNode> childExps = project.getChildExps();
-    RelDataType inputRowType = project.getInput(0).getRowType();
-    int outputCount = project.getRowType().getFieldCount();
-
-    pw.print("Context context = new StormContext(Processor.dataContext);\n");
-    pw.print("context.values = _data.toArray();\n");
-    pw.print(String.format("Object[] outputValues = new Object[%d];\n", outputCount));
-
-    pw.write(rexCompiler.compileToBlock(childExps, inputRowType).toString());
-
-    pw.print("    ctx.emit(new Values(outputValues));\n");
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void defaultValue(RelNode n, List<Void> inputStreams) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Void visitTableScan(TableScan scan, List<Void> inputStreams) throws Exception {
-    pw.print(String.format(STAGE_ENUMERABLE_TABLE_SCAN, getStageName(scan)));
-    return null;
-  }
-
-  @Override
-  public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) throws Exception {
-    beginAggregateStage(aggregate);
-    pw.println("        if (_data != null) {");
-    pw.println("        List<Object> curGroupValues = getGroupValues(_data);");
-    pw.println("        if (!state.containsKey(curGroupValues)) {");
-    pw.println("          state.put(curGroupValues, new HashMap<String, Object>());");
-    pw.println("        }");
-    pw.println("        Map<String, Object> accumulators = state.get(curGroupValues);");
-    for (AggregateCall call : aggregate.getAggCallList()) {
-      aggregate(call);
-    }
-    pw.println("        }");
-    endStage();
-    return null;
-  }
-
-  @Override
-  public Void visitJoin(Join join, List<Void> inputStreams) {
-    beginJoinStage(join);
-    pw.println("        if (source == left) {");
-    pw.println("            leftRows.add(_data);");
-    pw.println("        } else if (source == right) {");
-    pw.println("            rightRows.add(_data);");
-    pw.println("        }");
-    endStage();
-    return null;
-  }
-
-  private String groupValueEmitStr(String var, int n) {
-    int count = 0;
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      if (++count > 1) {
-        sb.append(", ");
-      }
-      sb.append(var).append(".").append("get(").append(i).append(")");
-    }
-    return sb.toString();
-  }
-
-  private String emitAggregateStmts(Aggregate aggregate) {
-    List<String> res = new ArrayList<>();
-    StringWriter sw = new StringWriter();
-    for (AggregateCall call : aggregate.getAggCallList()) {
-      res.add(aggregateResult(call, new PrintWriter(sw)));
-    }
-    return NEW_LINE_JOINER.join(sw.toString(),
-                                String.format("          ctx.emit(new Values(%s, %s));",
-                                              groupValueEmitStr("groupValues", aggregate.getGroupSet().cardinality()),
-                                              Joiner.on(", ").join(res)));
-  }
-
-  private String aggregateResult(AggregateCall call, PrintWriter pw) {
-    SqlAggFunction aggFunction = call.getAggregation();
-    String aggregationName = call.getAggregation().getName();
-    Type ty = typeFactory.getJavaClass(call.getType());
-    String result;
-    if (aggFunction instanceof SqlUserDefinedAggFunction) {
-      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
-      result = doAggregateResult((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, pw);
-    } else {
-      List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
-      if (typeClasses == null) {
-        throw new UnsupportedOperationException(aggregationName + " Not implemented");
-      }
-      result = doAggregateResult(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
-                                 reserveAggVarName(call), ty, pw);
-    }
-    return result;
-  }
-
-  private String doAggregateResult(AggregateFunctionImpl aggFn, String varName, Type ty, PrintWriter pw) {
-    String resultName = varName + "_result";
-    Class<?> accumulatorType = aggFn.accumulatorType;
-    Class<?> resultType = aggFn.resultType;
-    List<String> args = new ArrayList<>();
-    if (!aggFn.isStatic) {
-      String aggObjName = String.format("%s_obj", varName);
-      String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
-      pw.println("          @SuppressWarnings(\"unchecked\")");
-      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
-              aggObjName));
-      args.add(aggObjName);
-    }
-    args.add(String.format("(%s)accumulators.get(\"%s\")", accumulatorType.getCanonicalName(), varName));
-    pw.println(String.format("          final %s %s = %s;", resultType.getCanonicalName(),
-                             resultName, printMethodCall(aggFn.resultMethod, args)));
-
-    return resultName;
-  }
-
-  private void aggregate(AggregateCall call) {
-    SqlAggFunction aggFunction = call.getAggregation();
-    String aggregationName = call.getAggregation().getName();
-    Type ty = typeFactory.getJavaClass(call.getType());
-    if (call.getArgList().size() != 1) {
-      if (aggregationName.equals("COUNT")) {
-        if (call.getArgList().size() != 0) {
-          throw new UnsupportedOperationException("Count with nullable fields");
-        }
-      }
-    }
-    if (aggFunction instanceof SqlUserDefinedAggFunction) {
-      AggregateFunction aggregateFunction = ((SqlUserDefinedAggFunction) aggFunction).function;
-      doAggregate((AggregateFunctionImpl) aggregateFunction, reserveAggVarName(call), ty, call.getArgList());
-    } else {
-      List<BuiltinAggregateFunctions.TypeClass> typeClasses = BuiltinAggregateFunctions.TABLE.get(aggregationName);
-      if (typeClasses == null) {
-        throw new UnsupportedOperationException(aggregationName + " Not implemented");
-      }
-      doAggregate(AggregateFunctionImpl.create(findMatchingClass(aggregationName, typeClasses, ty)),
-                  reserveAggVarName(call), ty, call.getArgList());
-    }
-  }
-
-  private Class<?> findMatchingClass(String aggregationName, List<BuiltinAggregateFunctions.TypeClass> typeClasses, Type ty) {
-    for (BuiltinAggregateFunctions.TypeClass typeClass : typeClasses) {
-      if (typeClass.ty.equals(BuiltinAggregateFunctions.TypeClass.GenericType.class) || typeClass.ty.equals(ty)) {
-        return typeClass.clazz;
-      }
-    }
-    throw new UnsupportedOperationException(aggregationName + " Not implemeted for type '" + ty + "'");
-  }
-
-  private void doAggregate(AggregateFunctionImpl aggFn, String varName, Type ty, List<Integer> argList) {
-    List<String> args = new ArrayList<>();
-    Class<?> accumulatorType = aggFn.accumulatorType;
-    if (!aggFn.isStatic) {
-      String aggObjName = String.format("%s_obj", varName);
-      String aggObjClassName = aggFn.initMethod.getDeclaringClass().getCanonicalName();
-      pw.println(String.format("          if (!accumulators.containsKey(\"%s\")) { ", aggObjName));
-      pw.println(String.format("            accumulators.put(\"%s\", new %s());", aggObjName, aggObjClassName));
-      pw.println("          }");
-      pw.println("          @SuppressWarnings(\"unchecked\")");
-      pw.println(String.format("          final %1$s %2$s = (%1$s) accumulators.get(\"%2$s\");", aggObjClassName,
-              aggObjName));
-      args.add(aggObjName);
-    }
-    args.add(String.format("%1$s == null ? %2$s : (%3$s) %1$s",
-                           "accumulators.get(\"" + varName + "\")",
-                           printMethodCall(aggFn.initMethod, args),
-                           accumulatorType.getCanonicalName()));
-    if (argList.isEmpty()) {
-      args.add("EMPTY_VALUES");
-    } else {
-      for (int i = 0; i < aggFn.valueTypes.size(); i++) {
-        args.add(String.format("(%s) %s", aggFn.valueTypes.get(i).getCanonicalName(), "_data.get(" + argList.get(i) + ")"));
-      }
-    }
-    pw.print(String.format("          accumulators.put(\"%s\", %s);\n",
-                           varName,
-                           printMethodCall(aggFn.addMethod, args)));
-  }
-
-  private String reserveAggVarName(AggregateCall call) {
-    String varName;
-    if ((varName = aggregateCallVarNames.get(call)) == null) {
-      varName = call.getAggregation().getName() + ++nameCount;
-      aggregateCallVarNames.put(call, varName);
-    }
-    return varName;
-  }
-
-  private void beginStage(RelNode n) {
-    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
-  }
-
-  private void beginAggregateStage(Aggregate n) {
-    pw.print(String.format(AGGREGATE_STAGE_PROLOGUE, getStageName(n), getGroupByIndices(n), emitAggregateStmts(n)));
-  }
-
-  private void beginJoinStage(Join join) {
-    int[] ordinals = new int[2];
-    if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
-      throw new UnsupportedOperationException("Only simple equi joins are supported");
-    }
-
-    pw.print(String.format(JOIN_STAGE_PROLOGUE, getStageName(join),
-                           getStageName(join.getLeft()),
-                           getStageName(join.getRight()),
-                           ordinals[0],
-                           ordinals[1]));
-  }
-
-  private void endStage() {
-    pw.print("  }\n  };\n");
-  }
-
-  static String getStageName(RelNode n) {
-    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
-  }
-
-  private String getGroupByIndices(Aggregate n) {
-    StringBuilder res = new StringBuilder();
-    int count = 0;
-    for (int i : n.getGroupSet()) {
-      if (++count > 1) {
-        res.append(", ");
-      }
-      res.append(i);
-    }
-    return res.toString();
-  }
-
-  public static String printMethodCall(Method method, List<String> args) {
-    return printMethodCall(method.getDeclaringClass(), method.getName(),
-            Modifier.isStatic(method.getModifiers()), args);
-  }
-
-  private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
-    if (isStatic) {
-      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
-    } else {
-      return String.format("%s.%s(%s)", args.get(0), method,
-              Joiner.on(',').join(args.subList(1, args.size())));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
deleted file mode 100644
index 0b7c053..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright (C) 2010 Google, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.javac;
-
-
-import javax.tools.DiagnosticListener;
-import javax.tools.FileObject;
-import javax.tools.ForwardingJavaFileManager;
-import javax.tools.JavaCompiler;
-import javax.tools.JavaFileManager;
-import javax.tools.JavaFileObject;
-import javax.tools.SimpleJavaFileObject;
-import javax.tools.ToolProvider;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Collections.singleton;
-
-/**
- * This is a Java ClassLoader that will attempt to load a class from a string of source code.
- *
- * <h3>Example</h3>
- *
- * <pre>
- * String className = "com.foo.MyClass";
- * String classSource =
- *   "package com.foo;\n" +
- *   "public class MyClass implements Runnable {\n" +
- *   "  @Override public void run() {\n" +
- *   "   log(\"Hello world\");\n" +
- *   "  }\n" +
- *   "}";
- *
- * // Load class from source.
- * ClassLoader classLoader = new CompilingClassLoader(
- *     parentClassLoader, className, classSource);
- * Class myClass = classLoader.loadClass(className);
- *
- * // Use it.
- * Runnable instance = (Runnable)myClass.newInstance();
- * instance.run();
- * </pre>
- *
- * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
- * compile more, create multiple CompilingClassLoader instances.
- *
- * Uses Java 1.6's in built compiler API.
- *
- * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
- * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
- * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
- * {#setDiagnosticListener()}.
- *
- * @see java.lang.ClassLoader
- * @see javax.tools.JavaCompiler
- */
-public class CompilingClassLoader extends ClassLoader {
-
-  /**
-   * Thrown when code cannot be compiled.
-   */
-  public static class CompilerException extends Exception {
-    private static final long serialVersionUID = -2936958840023603270L;
-
-    public CompilerException(String message) {
-      super(message);
-    }
-  }
-
-  private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
-
-  private static final URI EMPTY_URI;
-
-  static {
-    try {
-      // Needed to keep SimpleFileObject constructor happy.
-      EMPTY_URI = new URI("");
-    } catch (URISyntaxException e) {
-      throw new Error(e);
-    }
-  }
-
-  /**
-   * @param parent Parent classloader to resolve dependencies from.
-   * @param className Name of class to compile. eg. "com.foo.MyClass".
-   * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
-   * @param diagnosticListener Notified of compiler errors (may be null).
-   */
-  public CompilingClassLoader(
-      ClassLoader parent,
-      String className,
-      String sourceCode,
-      DiagnosticListener<JavaFileObject> diagnosticListener)
-      throws CompilerException {
-    super(parent);
-    if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
-      throw new CompilerException("Could not compile " + className);
-    }
-  }
-
-  public Map<String, ByteArrayOutputStream> getClasses() {
-    return byteCodeForClasses;
-  }
-
-  /**
-   * Override ClassLoader's class resolving method. Don't call this directly, instead use
-   * {@link ClassLoader#loadClass(String)}.
-   */
-  @Override
-  public Class<?> findClass(String name) throws ClassNotFoundException {
-    ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
-    if (byteCode == null) {
-      throw new ClassNotFoundException(name);
-    }
-    return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
-  }
-
-  /**
-   * @return Whether compilation was successful.
-   */
-  private boolean compileSourceCodeToByteCode(
-      String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
-    JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
-
-    // Set up the in-memory filesystem.
-    InMemoryFileManager fileManager =
-        new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
-    JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
-
-    // Javac option: remove these when the javac zip impl is fixed
-    // (http://b/issue?id=1822932)
-    System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
-    List<String> options = new LinkedList<>();
-    // this is ignored by javac currently but useJavaUtilZip should be
-    // a valid javac XD option, which is another bug
-    options.add("-XDuseJavaUtilZip");
-
-    // Now compile!
-    JavaCompiler.CompilationTask compilationTask =
-        javaCompiler.getTask(
-            null, // Null: log any unhandled errors to stderr.
-            fileManager,
-            diagnosticListener,
-            options,
-            null,
-            singleton(javaFile));
-    return compilationTask.call();
-  }
-
-  /**
-   * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
-   * any files to disk.
-   *
-   * When files are written to, rather than putting the bytes on disk, they are appended to buffers
-   * in byteCodeForClasses.
-   *
-   * @see javax.tools.JavaFileManager
-   */
-  private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
-    public InMemoryFileManager(JavaFileManager fileManager) {
-      super(fileManager);
-    }
-
-    @Override
-    public JavaFileObject getJavaFileForOutput(
-        Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
-        throws IOException {
-      return new SimpleJavaFileObject(EMPTY_URI, kind) {
-        @Override
-        public OutputStream openOutputStream() throws IOException {
-          ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
-          if (outputStream != null) {
-            throw new IllegalStateException("Cannot write more than once");
-          }
-          // Reasonable size for a simple .class.
-          outputStream = new ByteArrayOutputStream(256);
-          byteCodeForClasses.put(className, outputStream);
-          return outputStream;
-        }
-      };
-    }
-  }
-
-  private static class InMemoryJavaFile extends SimpleJavaFileObject {
-    private final String sourceCode;
-
-    public InMemoryJavaFile(String className, String sourceCode) {
-      super(makeUri(className), Kind.SOURCE);
-      this.sourceCode = sourceCode;
-    }
-
-    private static URI makeUri(String className) {
-      try {
-        return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
-      } catch (URISyntaxException e) {
-        throw new RuntimeException(e); // Not sure what could cause this.
-      }
-    }
-
-    @Override
-    public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
-      return sourceCode;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
deleted file mode 100644
index c67d8e7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-public class ColumnConstraint extends SqlLiteral {
-  private ColumnConstraint(
-      Object value, SqlTypeName typeName, SqlParserPos pos) {
-    super(value, typeName, pos);
-  }
-
-  public static class PrimaryKey extends ColumnConstraint {
-    private final SqlMonotonicity monotonicity;
-    public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
-      super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
-      this.monotonicity = monotonicity;
-    }
-    public SqlMonotonicity monotonicity() {
-      return monotonicity;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
deleted file mode 100644
index 3520b86..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlDataTypeSpec;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.parser.SqlParserPos;
-
-import java.util.Arrays;
-
-public class ColumnDefinition extends SqlNodeList {
-  public ColumnDefinition(
-      SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
-    super(Arrays.asList(name, type, constraint), pos);
-  }
-
-  public String name() {
-    return get(0).toString();
-  }
-
-  public SqlDataTypeSpec type() {
-    return (SqlDataTypeSpec) get(1);
-  }
-
-  public ColumnConstraint constraint() {
-    return (ColumnConstraint) get(2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
deleted file mode 100644
index a53802c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateFunction.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-import org.apache.calcite.util.NlsString;
-
-import java.util.List;
-
-public class SqlCreateFunction extends SqlCall {
-    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
-            "CREATE_FUNCTION", SqlKind.OTHER) {
-        @Override
-        public SqlCall createCall(
-                SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
-            assert functionQualifier == null;
-            return new SqlCreateFunction(pos, (SqlIdentifier) o[0], o[1], o[2]);
-        }
-
-        @Override
-        public void unparse(
-                SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
-            SqlCreateFunction t = (SqlCreateFunction) call;
-            UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
-            u.keyword("CREATE", "FUNCTION").node(t.functionName).keyword("AS").node(t.className);
-            if (t.jarName != null) {
-                u.keyword("USING", "JAR").node(t.jarName);
-            }
-        }
-    };
-
-    private final SqlIdentifier functionName;
-    private final SqlNode className;
-    private final SqlNode jarName;
-
-    public SqlCreateFunction(SqlParserPos pos, SqlIdentifier functionName, SqlNode className, SqlNode jarName) {
-        super(pos);
-        this.functionName = functionName;
-        this.className = className;
-        this.jarName = jarName;
-    }
-
-    @Override
-    public SqlOperator getOperator() {
-        return OPERATOR;
-    }
-
-    @Override
-    public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(functionName, className);
-    }
-
-
-    @Override
-    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        getOperator().unparse(writer, this, leftPrec, rightPrec);
-    }
-
-    public String functionName() {
-        return functionName.toString();
-    }
-
-    public String className() {
-        return ((NlsString)SqlLiteral.value(className)).getValue();
-    }
-
-    public String jarName() {
-        return jarName == null ? null : ((NlsString)SqlLiteral.value(jarName)).getValue();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
deleted file mode 100644
index 670eedb..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-public class SqlCreateTable extends SqlCall {
-  private static final int DEFAULT_PARALLELISM = 1;
-
-  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
-      "CREATE_TABLE", SqlKind.OTHER) {
-    @Override
-    public SqlCall createCall(
-        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
-      assert functionQualifier == null;
-      return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
-                                o[2], o[3], o[4], o[5], o[6], o[7]);
-    }
-
-    @Override
-    public void unparse(
-        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
-      SqlCreateTable t = (SqlCreateTable) call;
-      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
-      u.keyword("CREATE", "EXTERNAL", "TABLE").node(t.tblName).nodeList(
-          t.fieldList);
-      if (t.inputFormatClass != null && t.outputFormatClass != null) {
-        u.keyword("STORED", "AS", "INPUTFORMAT").node(
-            t.inputFormatClass).keyword("OUTPUTFORMAT").node(
-            t.outputFormatClass);
-      }
-      u.keyword("LOCATION").node(t.location);
-      if (t.parallelism != null) {
-        u.keyword("PARALLELISM").node(t.parallelism);
-      }
-      if (t.properties != null) {
-        u.keyword("TBLPROPERTIES").node(t.properties);
-      }
-      if (t.query != null) {
-        u.keyword("AS").node(t.query);
-      }
-    }
-  };
-
-  private final SqlIdentifier tblName;
-  private final SqlNodeList fieldList;
-  private final SqlNode inputFormatClass;
-  private final SqlNode outputFormatClass;
-  private final SqlNode location;
-  private final SqlNode parallelism;
-  private final SqlNode properties;
-  private final SqlNode query;
-
-  public SqlCreateTable(
-          SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
-          SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
-          SqlNode parallelism, SqlNode properties, SqlNode query) {
-    super(pos);
-    this.tblName = tblName;
-    this.fieldList = fieldList;
-    this.inputFormatClass = inputFormatClass;
-    this.outputFormatClass = outputFormatClass;
-    this.location = location;
-    this.parallelism = parallelism;
-    this.properties = properties;
-    this.query = query;
-  }
-
-  @Override
-  public SqlOperator getOperator() {
-    return OPERATOR;
-  }
-
-  @Override
-  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-    getOperator().unparse(writer, this, leftPrec, rightPrec);
-  }
-
-  @Override
-  public List<SqlNode> getOperandList() {
-    return ImmutableNullableList.of(tblName, fieldList, inputFormatClass,
-                                    outputFormatClass, location, properties,
-                                    query);
-  }
-
-  public String tableName() {
-    return tblName.toString();
-  }
-
-  public URI location() {
-    return URI.create(getString(location));
-  }
-
-  public Integer parallelism() {
-    String parallelismStr = getString(parallelism);
-    if (parallelismStr != null) {
-      return Integer.parseInt(parallelismStr);
-    } else {
-      return DEFAULT_PARALLELISM;
-    }
-  }
-
-  public String inputFormatClass() {
-    return getString(inputFormatClass);
-  }
-
-  public String outputFormatClass() {
-    return getString(outputFormatClass);
-  }
-
-  public Properties properties() {
-    Properties props = new Properties();
-    if (properties != null) {
-      try {
-        ObjectMapper mapper = new ObjectMapper();
-        HashMap<String, Object> map = mapper.readValue(getString(properties), HashMap.class);
-        props.putAll(map);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return props;
-  }
-
-  private String getString(SqlNode n) {
-    return n == null ? null : SqlLiteral.stringValue(n);
-  }
-
-  @SuppressWarnings("unchecked")
-  public List<ColumnDefinition> fieldList() {
-    return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
deleted file mode 100644
index 3112e53..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-
-/**
- * Define the keywords that can occur in a CREATE TABLE statement
- */
-public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
-  PRIMARY
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
deleted file mode 100644
index 8444e1e..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.calcite.config.Lex;
-import org.apache.storm.sql.parser.impl.StormParserImpl;
-
-import java.io.StringReader;
-
-public class StormParser {
-  public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
-  private final StormParserImpl impl;
-
-  public StormParser(String s) {
-    this.impl = new StormParserImpl(new StringReader(s));
-    this.impl.setTabSize(1);
-    this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
-    this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
-    this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
-    /*
-     *  By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
-     *  is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
-     */
-    this.impl.switchTo("DQID");
-  }
-
-  @VisibleForTesting
-  public StormParserImpl impl() {
-    return impl;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
deleted file mode 100644
index 834fe7c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.parser;
-
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlWriter;
-
-class UnparseUtil {
-  private final SqlWriter writer;
-  private final int leftPrec;
-  private final int rightPrec;
-
-  UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
-    this.writer = writer;
-    this.leftPrec = leftPrec;
-    this.rightPrec = rightPrec;
-  }
-
-  UnparseUtil keyword(String... keywords) {
-    for (String k : keywords) {
-      writer.keyword(k);
-    }
-    return this;
-  }
-
-  UnparseUtil node(SqlNode n) {
-    n.unparse(writer, leftPrec, rightPrec);
-    return this;
-  }
-
-  UnparseUtil nodeList(SqlNodeList l) {
-    writer.keyword("(");
-    if (l.size() > 0) {
-      l.get(0).unparse(writer, leftPrec, rightPrec);
-      for (int i = 1; i < l.size(); ++i) {
-        writer.keyword(",");
-        l.get(i).unparse(writer, leftPrec, rightPrec);
-      }
-    }
-    writer.keyword(")");
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
deleted file mode 100644
index 437877c..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
-
-public class StormRelDataTypeSystem extends RelDataTypeSystemImpl {
-    public static final RelDataTypeSystem STORM_REL_DATATYPE_SYSTEM = new StormRelDataTypeSystem();
-
-    @Override
-    public int getMaxNumericScale() {
-        return 38;
-    }
-
-    @Override
-    public int getMaxNumericPrecision() {
-        return 38;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
deleted file mode 100644
index 40bbacd..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class StormRelUtils {
-    private static final Logger LOG = LoggerFactory.getLogger(StormRelUtils.class);
-
-    private static final AtomicInteger sequence = new AtomicInteger(0);
-    private static final AtomicInteger classSequence = new AtomicInteger(0);
-
-    public static String getStageName(TridentRel relNode) {
-        return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
-    }
-
-    public static String getClassName(TridentRel relNode) {
-        return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
-                classSequence.getAndIncrement();
-    }
-
-    public static TridentRel getStormRelInput(RelNode input) {
-        if (input instanceof RelSubset) {
-            // go with known best input
-            input = ((RelSubset) input).getBest();
-        }
-        return (TridentRel) input;
-    }
-
-    public static String explain(final RelNode rel) {
-        return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
-    }
-
-    public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
-        String explain = "";
-        try {
-            explain = RelOptUtil.toString(rel);
-        } catch (StackOverflowError e) {
-            LOG.error("StackOverflowError occurred while extracting plan. Please report it to the dev@ mailing list.");
-            LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
-            LOG.error("Forcing plan to empty string and continue... SQL Runner may not working properly after.");
-        }
-        return explain;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
deleted file mode 100644
index 258fe72..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner;
-
-import org.apache.calcite.sql.util.SqlShuttle;
-
-public class UnsupportedOperatorsVisitor extends SqlShuttle {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
deleted file mode 100644
index 1ea7912..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormCalcRelBase extends Calc implements StormRelNode {
-    protected StormCalcRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
-        super(cluster, traits, child, program);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
deleted file mode 100644
index 4e28460..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormFilterRelBase extends Filter implements StormRelNode {
-    protected StormFilterRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
-        super(cluster, traits, child, condition);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
deleted file mode 100644
index 1458ce7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.Set;
-
-public abstract class StormJoinRelBase extends Join implements StormRelNode {
-    protected StormJoinRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
-        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
deleted file mode 100644
index f4f23dc..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.List;
-
-public abstract class StormProjectRelBase extends Project implements StormRelNode {
-    protected StormProjectRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-        super(cluster, traits, input, projects, rowType);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
deleted file mode 100644
index 9327868..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.rel.RelNode;
-
-public interface StormRelNode extends RelNode {
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
deleted file mode 100644
index df47f3f..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-import java.util.List;
-
-public abstract class StormStreamInsertRelBase extends TableModify implements StormRelNode {
-    protected StormStreamInsertRelBase(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
-        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
deleted file mode 100644
index 36c62b2..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-
-public abstract class StormStreamScanRelBase extends TableScan implements StormRelNode {
-
-    // FIXME: define Table class and table.unwrap() to get it
-
-    protected StormStreamScanRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
-        super(cluster, traitSet, table);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
deleted file mode 100644
index f98fb02..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.storm.sql.planner.trident;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.planner.StormRelDataTypeSystem;
-import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.AbstractTridentProcessor;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class QueryPlanner {
-
-    public static final int STORM_REL_CONVERSION_RULES = 1;
-
-    private final Planner planner;
-
-    private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
-            RelDataTypeSystem.DEFAULT);
-
-    public QueryPlanner(SchemaPlus schema) {
-        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
-        traitDefs.add(ConventionTraitDef.INSTANCE);
-        traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-        sqlOperatorTables.add(SqlStdOperatorTable.instance());
-        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
-                false,
-                Collections.<String>emptyList(), typeFactory));
-
-        FrameworkConfig config = Frameworks.newConfigBuilder()
-                .defaultSchema(schema)
-                .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-                .traitDefs(traitDefs)
-                .context(Contexts.EMPTY_CONTEXT)
-                .ruleSets(TridentStormRuleSets.getRuleSets())
-                .costFactory(null)
-                .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
-                .build();
-        this.planner = Frameworks.getPlanner(config);
-    }
-
-    public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
-        TridentRel relNode = getPlan(query);
-
-        TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
-        relNode.tridentPlan(tridentPlanCreator);
-
-        final TridentTopology topology = tridentPlanCreator.getTopology();
-        final IAggregatableStream lastStream = tridentPlanCreator.pop();
-        final DataContext dc = tridentPlanCreator.getDataContext();
-        final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
-
-        return new AbstractTridentProcessor() {
-            @Override
-            public TridentTopology build() {
-                return topology;
-            }
-
-            @Override
-            public Stream outputStream() {
-                return lastStream.toStream();
-            }
-
-            @Override
-            public DataContext getDataContext() {
-                return dc;
-            }
-
-            @Override
-            public List<CompilingClassLoader> getClassLoaders() {
-                return cls;
-            }
-        };
-    }
-
-    public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
-        return (TridentRel) validateAndConvert(planner.parse(query));
-    }
-
-    private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
-        SqlNode validated = validateNode(sqlNode);
-        RelNode relNode = convertToRelNode(validated);
-        return convertToStormRel(relNode);
-    }
-
-    private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
-        RelTraitSet traitSet = relNode.getTraitSet();
-        traitSet = traitSet.simplify();
-
-        // PlannerImpl.transform() optimizes RelNode with ruleset
-        return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
-    }
-
-    private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
-        return planner.rel(sqlNode).rel;
-    }
-
-    private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
-        SqlNode validatedSqlNode = planner.validate(sqlNode);
-        validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
-        return validatedSqlNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
deleted file mode 100644
index aa30552..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident;
-
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.interpreter.Scalar;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.StormDataContext;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class TridentPlanCreator {
-    private final Map<String, ISqlTridentDataSource> sources;
-    private final JavaTypeFactory typeFactory;
-    private final RexNodeToJavaCodeCompiler rexCompiler;
-    private final DataContext dataContext;
-    private final TridentTopology topology;
-
-    private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
-    private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
-
-    public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
-        this.sources = sources;
-        this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
-        this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
-
-        this.topology = new TridentTopology();
-        this.dataContext = new StormDataContext();
-    }
-
-    public void addStream(IAggregatableStream stream) throws Exception {
-        push(stream);
-    }
-
-    public IAggregatableStream pop() {
-        return streamStack.pop();
-    }
-
-    public Map<String, ISqlTridentDataSource> getSources() {
-        return sources;
-    }
-
-    public DataContext getDataContext() {
-        return dataContext;
-    }
-
-    public JavaTypeFactory getTypeFactory() {
-        return typeFactory;
-    }
-
-    public TridentTopology getTopology() {
-        return topology;
-    }
-
-    public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
-            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
-        String expr = rexCompiler.compile(nodes, inputRowType, className);
-        CompilingClassLoader classLoader = new CompilingClassLoader(
-                getLastClassLoader(), className, expr, null);
-        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
-        addClassLoader(classLoader);
-        return new DebuggableExecutableExpression(instance, expr);
-    }
-
-    public ExecutableExpression createScalarInstance(RexProgram program, String className)
-            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
-        String expr = rexCompiler.compile(program, className);
-        CompilingClassLoader classLoader = new CompilingClassLoader(
-                getLastClassLoader(), className, expr, null);
-        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
-        addClassLoader(classLoader);
-        return new DebuggableExecutableExpression(instance, expr);
-    }
-
-    private void push(IAggregatableStream stream) {
-        streamStack.push(stream);
-    }
-
-    public void addClassLoader(CompilingClassLoader compilingClassLoader) {
-        this.classLoaders.add(compilingClassLoader);
-    }
-
-    public ClassLoader getLastClassLoader() {
-        if (this.classLoaders.size() > 0) {
-            return this.classLoaders.get(this.classLoaders.size() - 1);
-        } else {
-            return this.getClass().getClassLoader();
-        }
-    }
-
-    public List<CompilingClassLoader> getClassLoaders() {
-        return classLoaders;
-    }
-}