You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:54 UTC

[08/20] storm git commit: [StormSQL] STORM-1173. Support string operations in StormSQL.

[StormSQL] STORM-1173. Support string operations in StormSQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39400075
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39400075
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39400075

Branch: refs/heads/master
Commit: 39400075b0e83416f84879b8785be1b1cc0c2826
Parents: beeaee7
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 4 15:51:11 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/DataSourcesProvider.java   |   3 +-
 .../apache/storm/sql/DataSourcesRegistry.java   |   3 +-
 .../src/jvm/org/apache/storm/sql/StormSql.java  |   6 +-
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |   6 +-
 .../apache/storm/sql/compiler/ExprCompiler.java | 205 ++++++++++++-------
 .../apache/storm/sql/compiler/PlanCompiler.java |  14 +-
 .../test/org/apache/storm/sql/TestStormSql.java |   4 +-
 .../storm/sql/compiler/TestExprCompiler.java    |   6 +-
 .../storm/sql/compiler/TestExprSemantic.java    |  28 ++-
 .../storm/sql/compiler/TestPlanCompiler.java    |   6 +-
 .../storm/sql/compiler/TestRelNodeCompiler.java |   2 +-
 .../apache/storm/sql/compiler/TestUtils.java    |   6 +-
 .../sql/runtime/AbstractChannelHandler.java     |  37 ++++
 .../sql/runtime/AbstractValuesProcessor.java    |  49 +++++
 .../storm/sql/runtime/ChannelContext.java       |  30 +++
 .../storm/sql/runtime/ChannelHandler.java       |  39 ++++
 .../org/apache/storm/sql/runtime/Channels.java  |  80 ++++++++
 .../apache/storm/sql/runtime/DataSource.java    |  29 +++
 .../storm/sql/runtime/StormSqlFunctions.java    |  36 ++++
 .../storm/sql/storm/AbstractChannelHandler.java |  35 ----
 .../apache/storm/sql/storm/ChannelContext.java  |  28 ---
 .../apache/storm/sql/storm/ChannelHandler.java  |  37 ----
 .../org/apache/storm/sql/storm/Channels.java    |  78 -------
 .../org/apache/storm/sql/storm/DataSource.java  |  27 ---
 .../storm/runtime/AbstractValuesProcessor.java  |  29 ---
 25 files changed, 477 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
index cc4874b..46bfa40 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
@@ -18,8 +18,7 @@
 
 package org.apache.storm.sql;
 
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.DataSource;
 
 import java.net.URI;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
index a841609..b45d039 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
@@ -19,12 +19,11 @@
 package org.apache.storm.sql;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.DataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
index 477e633..6859f8e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -17,11 +17,7 @@
  */
 package org.apache.storm.sql;
 
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-
-import java.util.Map;
+import org.apache.storm.sql.runtime.ChannelHandler;
 
 /**
  * The StormSql class provides standalone, interactive interfaces to execute

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 136bc88..384b4fa 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -32,9 +32,9 @@ import org.apache.storm.sql.compiler.PlanCompiler;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 
 import java.util.AbstractMap;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 6617ef6..77fdf0c 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -17,48 +17,33 @@
  */
 package org.apache.storm.sql.compiler;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexDynamicParam;
-import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexOver;
-import org.apache.calcite.rex.RexRangeRef;
-import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.*;
+import org.apache.calcite.runtime.SqlFunctions;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Util;
+import org.apache.storm.sql.runtime.StormSqlFunctions;
 
 import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.AND;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE_INTEGER;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_FALSE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_FALSE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_NULL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_TRUE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NULL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_TRUE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MINUS;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MULTIPLY;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.NOT;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.OR;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.PLUS;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*;
 
 /**
  * Compile RexNode on top of the Tuple abstraction.
@@ -98,7 +83,7 @@ class ExprCompiler implements RexVisitor<String> {
       case CHAR:
         return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true);
       case NULL:
-        return "null";
+        return "((" + ((Class<?>)typeFactory.getJavaClass(ty)).getCanonicalName() + ")null)";
       case DOUBLE:
       case BIGINT:
       case DECIMAL:
@@ -164,6 +149,13 @@ class ExprCompiler implements RexVisitor<String> {
     return "t" + ++nameCount;
   }
 
+  // Only generate inline expressions when comparing primitive types
+  private boolean primitiveCompareExpr(SqlOperator op, RelDataType type) {
+    final Primitive primitive = Primitive.ofBoxOr(typeFactory.getJavaClass(type));
+    return primitive != null &&
+        (op == LESS_THAN || op == LESS_THAN_OR_EQUAL || op == GREATER_THAN || op == GREATER_THAN_OR_EQUAL);
+  }
+
   private interface CallExprPrinter {
     String translate(ExprCompiler compiler, RexCall call);
   }
@@ -179,15 +171,25 @@ class ExprCompiler implements RexVisitor<String> {
     private ImpTable() {
       ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder =
           ImmutableMap.builder();
-      builder.put(infixBinary(LESS_THAN, "<"))
-          .put(infixBinary(LESS_THAN_OR_EQUAL, "<="))
-          .put(infixBinary(GREATER_THAN, ">"))
-          .put(infixBinary(GREATER_THAN_OR_EQUAL, ">="))
-          .put(infixBinary(PLUS, "+"))
-          .put(infixBinary(MINUS, "-"))
-          .put(infixBinary(MULTIPLY, "*"))
-          .put(infixBinary(DIVIDE, "/"))
-          .put(infixBinary(DIVIDE_INTEGER, "/"))
+      builder
+          .put(builtInMethod(UPPER, BuiltInMethod.UPPER, NullPolicy.STRICT))
+          .put(builtInMethod(LOWER, BuiltInMethod.LOWER, NullPolicy.STRICT))
+          .put(builtInMethod(INITCAP, BuiltInMethod.INITCAP, NullPolicy.STRICT))
+          .put(builtInMethod(SUBSTRING, BuiltInMethod.SUBSTRING, NullPolicy.STRICT))
+          .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+          .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+          .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT))
+          .put(infixBinary(LESS_THAN, "<", "lt"))
+          .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
+          .put(infixBinary(GREATER_THAN, ">", "gt"))
+          .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=", "ge"))
+          .put(infixBinary(EQUALS, "==", StormSqlFunctions.class, "eq"))
+          .put(infixBinary(NOT_EQUALS, "<>", StormSqlFunctions.class, "ne"))
+          .put(infixBinary(PLUS, "+", "plus"))
+          .put(infixBinary(MINUS, "-", "minus"))
+          .put(infixBinary(MULTIPLY, "*", "multiply"))
+          .put(infixBinary(DIVIDE, "/", "divide"))
+          .put(infixBinary(DIVIDE_INTEGER, "/", "divide"))
           .put(expect(IS_NULL, null))
           .put(expectNot(IS_NOT_NULL, null))
           .put(expect(IS_TRUE, true))
@@ -210,8 +212,38 @@ class ExprCompiler implements RexVisitor<String> {
       }
     }
 
+    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
+        final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
+      if (nullPolicy != NullPolicy.STRICT) {
+        throw new UnsupportedOperationException();
+      }
+      CallExprPrinter printer = new CallExprPrinter() {
+        @Override
+        public String translate(ExprCompiler compiler, RexCall call) {
+          PrintWriter pw = compiler.pw;
+          String val = compiler.reserveName();
+          pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
+          List<String> args = new ArrayList<>();
+          for (RexNode op : call.getOperands()) {
+            args.add(op.accept(compiler));
+          }
+          pw.print("if (false) {}\n");
+          for (int i = 0; i < args.size(); ++i) {
+            String arg = args.get(i);
+            if (call.getOperands().get(i).getType().isNullable()) {
+              pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg));
+            }
+          }
+          String calc = printMethodCall(method.method, args);
+          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
+          return val;
+        }
+      };
+      return new AbstractMap.SimpleImmutableEntry<>(op, printer);
+    }
+
     private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
-        (SqlOperator op, final String javaOperator) {
+        (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) {
       CallExprPrinter trans = new CallExprPrinter() {
         @Override
         public String translate(
@@ -221,40 +253,44 @@ class ExprCompiler implements RexVisitor<String> {
           String val = compiler.reserveName();
           RexNode op0 = call.getOperands().get(0);
           RexNode op1 = call.getOperands().get(1);
+          PrintWriter pw = compiler.pw;
+          if (backupMethodName != null) {
+            if (!compiler.primitiveCompareExpr(op, op0.getType())) {
+              String lhs = op0.accept(compiler);
+              String rhs = op1.accept(compiler);
+              pw.print(String.format("%s %s = %s;\n", compiler.javaTypeName(call), val,
+                  printMethodCall(clazz, backupMethodName, true, Lists.newArrayList(lhs, rhs))));
+              return val;
+            }
+          }
           boolean lhsNullable = op0.getType().isNullable();
           boolean rhsNullable = op1.getType().isNullable();
 
-          PrintWriter pw = compiler.pw;
-          String lhs = op0.accept(compiler);
           pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
-          if (!lhsNullable) {
-            String rhs = op1.accept(compiler);
-            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", rhs);
-            if (!rhsNullable) {
-              pw.print(String.format("%s = %s;\n", val, calc));
-            } else {
-              pw.print(
-                  String.format("%s = %s == null ? null : (%s);\n",
-                      val, rhs, calc));
-            }
-          } else {
-            pw.print(String.format("if (%2$s == null) { %1$s = null; }\n",
-                val, lhs));
-            pw.print("else {\n");
-            String rhs = op1.accept(compiler);
-            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", lhs);
-            if (!rhsNullable) {
-              pw.print(String.format("%s = %s;\n}\n", val, calc));
-            } else {
-              pw.print(String.format("%1$s = %2$s == null ? null : (%3$s);\n}\n", val, rhs, calc));
-            }
+          String lhs = op0.accept(compiler);
+          String rhs = op1.accept(compiler);
+          pw.print("if (false) {}\n");
+          if (lhsNullable) {
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op1);
+            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, lhs, calc));
           }
+          if (rhsNullable) {
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op0);
+            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, rhs, calc));
+          }
+          String calc = String.format("%s %s %s", lhs, javaOperator, rhs);
+          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
           return val;
         }
       };
       return new AbstractMap.SimpleImmutableEntry<>(op, trans);
     }
 
+    private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
+        (final SqlOperator op, final String javaOperator, final String backupMethodName) {
+      return infixBinary(op, javaOperator, SqlFunctions.class, backupMethodName);
+    }
+
     private Map.Entry<SqlOperator, CallExprPrinter> expect(
         SqlOperator op, final Boolean expect) {
       return expect0(op, expect, false);
@@ -327,17 +363,16 @@ class ExprCompiler implements RexVisitor<String> {
           pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
         } else {
           String foldedLHS = foldNullExpr(
-              String.format("%1$s == null || %1$s", lhs), "true", lhs);
+              String.format("%1$s == null || %1$s", lhs), "true", op0);
           pw.print(String.format("if (%s) {\n", foldedLHS));
           String rhs = op1.accept(compiler);
           String s;
           if (rhsNullable) {
             s = foldNullExpr(
                 String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
-                              rhs),
-                "null", rhs);
+                    rhs), "null", op1);
           } else {
-            s = String.format("!(%2$s) ? false : %1$s", lhs, rhs);
+            s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
           }
           pw.print(String.format("  %1$s = %2$s;\n", val, s));
           pw.print(String.format("} else { %1$s = false; }\n", val));
@@ -369,16 +404,16 @@ class ExprCompiler implements RexVisitor<String> {
           pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
         } else {
           String foldedLHS = foldNullExpr(
-              String.format("%1$s == null || !(%1$s)", lhs), "true", lhs);
+              String.format("%1$s == null || !(%1$s)", lhs), "true", op0);
           pw.print(String.format("if (%s) {\n", foldedLHS));
           String rhs = op1.accept(compiler);
           String s;
           if (rhsNullable) {
             s = foldNullExpr(
                 String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
-                "null", rhs);
+                "null", op1);
           } else {
-            s = String.format("%2$s ? %2$s : %1$s", lhs, rhs);
+            s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
           }
           pw.print(String.format("  %1$s = %2$s;\n", val, s));
           pw.print(String.format("} else { %1$s = true; }\n", val));
@@ -402,20 +437,34 @@ class ExprCompiler implements RexVisitor<String> {
           pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
         } else {
           String s = foldNullExpr(
-              String.format("%1$s == null ? null : !(%1$s)", lhs), "null", lhs);
+              String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op);
           pw.print(String.format("%1$s = %2$s;\n", val, s));
         }
         return val;
       }
     };
+  }
 
-    private static String foldNullExpr(String notNullExpr, String
-        nullExpr, String op) {
-      if (op.equals("null")) {
-        return nullExpr;
-      } else {
-        return notNullExpr;
-      }
+  private static String foldNullExpr(String notNullExpr, String
+      nullExpr, RexNode op) {
+    if (op instanceof RexLiteral && ((RexLiteral)op).getTypeName() == SqlTypeName.NULL) {
+      return nullExpr;
+    } else {
+      return notNullExpr;
+    }
+  }
+
+  private static String printMethodCall(Method method, List<String> args) {
+    return printMethodCall(method.getDeclaringClass(), method.getName(),
+        Modifier.isStatic(method.getModifiers()), args);
+  }
+
+  private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
+    if (isStatic) {
+      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
+    } else {
+      return String.format("%s.%s(%s)", args.get(0), method,
+          Joiner.on(',').join(args.subList(1, args.size())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
index d006261..1096f5b 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -22,7 +22,7 @@ import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -38,12 +38,12 @@ public class PlanCompiler {
       "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
       "import java.util.Iterator;", "import java.util.Map;",
       "import backtype.storm.tuple.Values;",
-      "import org.apache.storm.sql.storm.AbstractChannelHandler;",
-      "import org.apache.storm.sql.storm.Channels;",
-      "import org.apache.storm.sql.storm.ChannelContext;",
-      "import org.apache.storm.sql.storm.ChannelHandler;",
-      "import org.apache.storm.sql.storm.DataSource;",
-      "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;",
+      "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
+      "import org.apache.storm.sql.runtime.Channels;",
+      "import org.apache.storm.sql.runtime.ChannelContext;",
+      "import org.apache.storm.sql.runtime.ChannelHandler;",
+      "import org.apache.storm.sql.runtime.DataSource;",
+      "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
       "public final class Processor extends AbstractValuesProcessor {", "");
   private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
       "  @Override",

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 07b367f..e18b9f8 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -19,8 +19,8 @@ package org.apache.storm.sql;
 
 import backtype.storm.tuple.Values;
 import org.apache.storm.sql.compiler.TestUtils;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index f03cff8..a5f9d67 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -86,8 +86,8 @@ public class TestExprCompiler {
       }
     }
     assertThat(sw.get(0).toString(), containsString("1 > 2"));
-    assertThat(sw.get(1).toString(), containsString("3 + 5"));
-    assertThat(sw.get(2).toString(), containsString("1 - 1.0E0"));
-    assertThat(sw.get(3).toString(), containsString("3 +"));
+    assertThat(sw.get(1).toString(), containsString("plus(3,5)"));
+    assertThat(sw.get(2).toString(), containsString("minus(1,1.0E0)"));
+    assertThat(sw.get(3).toString(), containsString("plus(3,"));
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 8454d7e..1d98664 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -23,9 +23,9 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -99,6 +99,28 @@ public class TestExprSemantic {
                             false, null), v);
   }
 
+  @Test
+  public void testEquals() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
+            "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
+        ));
+    assertEquals(new Values(false, null, true, null, null, false,
+        true, null, false, null, null, true), v);
+  }
+
+  @Test
+  public void testStringMethods() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UPPER('a')", "LOWER('A')", "INITCAP('foo')",
+            "SUBSTRING('foo', 2)", "CHARACTER_LENGTH('foo')", "CHAR_LENGTH('foo')",
+            "'ab' || 'cd'"
+        ));
+    assertEquals(new Values("A", "a", "Foo", "oo", 3, 3, "abcd"), v);
+  }
+
   private Values testExpr(List<String> exprs) throws Exception {
     String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
         " WHERE ID > 0 AND ID < 2";

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
index 30df0f3..d32fdca 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -21,9 +21,9 @@ import backtype.storm.tuple.Values;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index d820f22..623a2f4 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -57,7 +57,7 @@ public class TestRelNodeCompiler {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
       compiler.visitProject(project);
       pw.flush();
-      Assert.assertThat(sw.toString(), containsString("+ 1"));
+      Assert.assertThat(sw.toString(), containsString("plus("));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
index 6731c90..5aa4cb0 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
@@ -15,9 +15,9 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.storm.ChannelContext;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.ChannelContext;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
 
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
new file mode 100644
index 0000000..73a078c
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -0,0 +1,37 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+  @Override
+  public abstract void dataReceived(ChannelContext ctx, Values data);
+
+  @Override
+  public void channelInactive(ChannelContext ctx) {
+
+  }
+
+  @Override
+  public void exceptionCaught(Throwable cause) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..11aa065
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+  /**
+   * Initialize the data sources.
+   *
+   * @param data a map from the table name to the iterators of the values.
+   *
+   */
+  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+      result);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
new file mode 100644
index 0000000..71aba03
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -0,0 +1,30 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public interface ChannelContext {
+  /**
+   * Emit data to the next stage of the data pipeline.
+   */
+  void emit(Values data);
+  void fireChannelInactive();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
new file mode 100644
index 0000000..117f312
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -0,0 +1,39 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+  void dataReceived(ChannelContext ctx, Values data);
+
+  /**
+   * The producer of the data has indicated that the channel is no longer
+   * active.
+   * @param ctx
+   */
+  void channelInactive(ChannelContext ctx);
+
+  void exceptionCaught(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
new file mode 100644
index 0000000..7214f9a
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -0,0 +1,80 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public class Channels {
+  private static final ChannelContext VOID_CTX = new ChannelContext() {
+    @Override
+    public void emit(Values data) {}
+
+    @Override
+    public void fireChannelInactive() {}
+  };
+
+  private static class ChannelContextAdapter implements ChannelContext {
+    private final ChannelHandler handler;
+    private final ChannelContext next;
+
+    public ChannelContextAdapter(
+        ChannelContext next, ChannelHandler handler) {
+      this.handler = handler;
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      handler.dataReceived(next, data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      handler.channelInactive(next);
+    }
+  }
+
+  private static class ForwardingChannelContext implements ChannelContext {
+    private final ChannelContext next;
+
+    public ForwardingChannelContext(ChannelContext next) {
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      next.emit(data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      next.fireChannelInactive();
+    }
+  }
+
+  public static ChannelContext chain(
+      ChannelContext next, ChannelHandler handler) {
+    return new ChannelContextAdapter(next, handler);
+  }
+
+  public static ChannelContext voidContext() {
+    return VOID_CTX;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
new file mode 100644
index 0000000..3e80cb2
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -0,0 +1,29 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+  void open(ChannelContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
new file mode 100644
index 0000000..62b1019
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -0,0 +1,36 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+public class StormSqlFunctions {
+  public static Boolean eq(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return b0.equals(b1);
+  }
+
+  public static Boolean ne(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return !b0.equals(b1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
deleted file mode 100644
index cf110e3..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public abstract class AbstractChannelHandler implements ChannelHandler {
-  @Override
-  public abstract void dataReceived(ChannelContext ctx, Values data);
-
-  @Override
-  public void channelInactive(ChannelContext ctx) {
-
-  }
-
-  @Override
-  public void exceptionCaught(Throwable cause) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
deleted file mode 100644
index a2806b2..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public interface ChannelContext {
-  /**
-   * Emit data to the next stage of the data pipeline.
-   */
-  void emit(Values data);
-  void fireChannelInactive();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
deleted file mode 100644
index 8cd3a28..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-/**
- * DataListener provides an event-driven interface for the user to process
- * series of events.
- */
-public interface ChannelHandler {
-  void dataReceived(ChannelContext ctx, Values data);
-
-  /**
-   * The producer of the data has indicated that the channel is no longer
-   * active.
-   * @param ctx
-   */
-  void channelInactive(ChannelContext ctx);
-
-  void exceptionCaught(Throwable cause);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
deleted file mode 100644
index b5bb619..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public class Channels {
-  private static final ChannelContext VOID_CTX = new ChannelContext() {
-    @Override
-    public void emit(Values data) {}
-
-    @Override
-    public void fireChannelInactive() {}
-  };
-
-  private static class ChannelContextAdapter implements ChannelContext {
-    private final ChannelHandler handler;
-    private final ChannelContext next;
-
-    public ChannelContextAdapter(
-        ChannelContext next, ChannelHandler handler) {
-      this.handler = handler;
-      this.next = next;
-    }
-
-    @Override
-    public void emit(Values data) {
-      handler.dataReceived(next, data);
-    }
-
-    @Override
-    public void fireChannelInactive() {
-      handler.channelInactive(next);
-    }
-  }
-
-  private static class ForwardingChannelContext implements ChannelContext {
-    private final ChannelContext next;
-
-    public ForwardingChannelContext(ChannelContext next) {
-      this.next = next;
-    }
-
-    @Override
-    public void emit(Values data) {
-      next.emit(data);
-    }
-
-    @Override
-    public void fireChannelInactive() {
-      next.fireChannelInactive();
-    }
-  }
-
-  public static ChannelContext chain(
-      ChannelContext next, ChannelHandler handler) {
-    return new ChannelContextAdapter(next, handler);
-  }
-
-  public static ChannelContext voidContext() {
-    return VOID_CTX;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
deleted file mode 100644
index 84fa6e0..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-/**
- * A DataSource ingests data in StormSQL. It provides a series of tuple to
- * the downstream {@link ChannelHandler}.
- *
- */
-public interface DataSource {
-  void open(ChannelContext ctx);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
deleted file mode 100644
index bd068be..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.storm.sql.storm.runtime;
-
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-
-import java.util.Map;
-
-/**
- * Subclass of AbstractTupleProcessor provides a series of tuple. It
- * takes a series of iterators of {@link Values} and produces a stream of
- * tuple.
- *
- * The subclass implements the {@see next()} method to provide
- * the output of the stream. It can choose to return null in {@see next()} to
- * indicate that this particular iteration is a no-op. SQL processors depend
- * on this semantic to implement filtering and nullable records.
- */
-public abstract class AbstractValuesProcessor {
-
-  /**
-   * Initialize the data sources.
-   *
-   * @param data a map from the table name to the iterators of the values.
-   *
-   */
-  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
-      result);
-}