You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:54 UTC
[08/20] storm git commit: [StormSQL] STORM-1173. Support string
operations in StormSQL.
[StormSQL] STORM-1173. Support string operations in StormSQL.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39400075
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39400075
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39400075
Branch: refs/heads/master
Commit: 39400075b0e83416f84879b8785be1b1cc0c2826
Parents: beeaee7
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 4 15:51:11 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800
----------------------------------------------------------------------
.../apache/storm/sql/DataSourcesProvider.java | 3 +-
.../apache/storm/sql/DataSourcesRegistry.java | 3 +-
.../src/jvm/org/apache/storm/sql/StormSql.java | 6 +-
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 6 +-
.../apache/storm/sql/compiler/ExprCompiler.java | 205 ++++++++++++-------
.../apache/storm/sql/compiler/PlanCompiler.java | 14 +-
.../test/org/apache/storm/sql/TestStormSql.java | 4 +-
.../storm/sql/compiler/TestExprCompiler.java | 6 +-
.../storm/sql/compiler/TestExprSemantic.java | 28 ++-
.../storm/sql/compiler/TestPlanCompiler.java | 6 +-
.../storm/sql/compiler/TestRelNodeCompiler.java | 2 +-
.../apache/storm/sql/compiler/TestUtils.java | 6 +-
.../sql/runtime/AbstractChannelHandler.java | 37 ++++
.../sql/runtime/AbstractValuesProcessor.java | 49 +++++
.../storm/sql/runtime/ChannelContext.java | 30 +++
.../storm/sql/runtime/ChannelHandler.java | 39 ++++
.../org/apache/storm/sql/runtime/Channels.java | 80 ++++++++
.../apache/storm/sql/runtime/DataSource.java | 29 +++
.../storm/sql/runtime/StormSqlFunctions.java | 36 ++++
.../storm/sql/storm/AbstractChannelHandler.java | 35 ----
.../apache/storm/sql/storm/ChannelContext.java | 28 ---
.../apache/storm/sql/storm/ChannelHandler.java | 37 ----
.../org/apache/storm/sql/storm/Channels.java | 78 -------
.../org/apache/storm/sql/storm/DataSource.java | 27 ---
.../storm/runtime/AbstractValuesProcessor.java | 29 ---
25 files changed, 477 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
index cc4874b..46bfa40 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
@@ -18,8 +18,7 @@
package org.apache.storm.sql;
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.DataSource;
import java.net.URI;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
index a841609..b45d039 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
@@ -19,12 +19,11 @@
package org.apache.storm.sql;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
index 477e633..6859f8e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -17,11 +17,7 @@
*/
package org.apache.storm.sql;
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-
-import java.util.Map;
+import org.apache.storm.sql.runtime.ChannelHandler;
/**
* The StormSql class provides standalone, interactive interfaces to execute
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 136bc88..384b4fa 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -32,9 +32,9 @@ import org.apache.storm.sql.compiler.PlanCompiler;
import org.apache.storm.sql.parser.ColumnDefinition;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import java.util.AbstractMap;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 6617ef6..77fdf0c 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -17,48 +17,33 @@
*/
package org.apache.storm.sql.compiler;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexDynamicParam;
-import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexOver;
-import org.apache.calcite.rex.RexRangeRef;
-import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.*;
+import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Util;
+import org.apache.storm.sql.runtime.StormSqlFunctions;
import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.AND;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE_INTEGER;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_FALSE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_FALSE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_NULL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_TRUE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NULL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_TRUE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MINUS;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MULTIPLY;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.NOT;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.OR;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.PLUS;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*;
/**
* Compile RexNode on top of the Tuple abstraction.
@@ -98,7 +83,7 @@ class ExprCompiler implements RexVisitor<String> {
case CHAR:
return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true);
case NULL:
- return "null";
+ return "((" + ((Class<?>)typeFactory.getJavaClass(ty)).getCanonicalName() + ")null)";
case DOUBLE:
case BIGINT:
case DECIMAL:
@@ -164,6 +149,13 @@ class ExprCompiler implements RexVisitor<String> {
return "t" + ++nameCount;
}
+ // Only generate inline expressions when comparing primitive types
+ private boolean primitiveCompareExpr(SqlOperator op, RelDataType type) {
+ final Primitive primitive = Primitive.ofBoxOr(typeFactory.getJavaClass(type));
+ return primitive != null &&
+ (op == LESS_THAN || op == LESS_THAN_OR_EQUAL || op == GREATER_THAN || op == GREATER_THAN_OR_EQUAL);
+ }
+
private interface CallExprPrinter {
String translate(ExprCompiler compiler, RexCall call);
}
@@ -179,15 +171,25 @@ class ExprCompiler implements RexVisitor<String> {
private ImpTable() {
ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder =
ImmutableMap.builder();
- builder.put(infixBinary(LESS_THAN, "<"))
- .put(infixBinary(LESS_THAN_OR_EQUAL, "<="))
- .put(infixBinary(GREATER_THAN, ">"))
- .put(infixBinary(GREATER_THAN_OR_EQUAL, ">="))
- .put(infixBinary(PLUS, "+"))
- .put(infixBinary(MINUS, "-"))
- .put(infixBinary(MULTIPLY, "*"))
- .put(infixBinary(DIVIDE, "/"))
- .put(infixBinary(DIVIDE_INTEGER, "/"))
+ builder
+ .put(builtInMethod(UPPER, BuiltInMethod.UPPER, NullPolicy.STRICT))
+ .put(builtInMethod(LOWER, BuiltInMethod.LOWER, NullPolicy.STRICT))
+ .put(builtInMethod(INITCAP, BuiltInMethod.INITCAP, NullPolicy.STRICT))
+ .put(builtInMethod(SUBSTRING, BuiltInMethod.SUBSTRING, NullPolicy.STRICT))
+ .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+ .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+ .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT))
+ .put(infixBinary(LESS_THAN, "<", "lt"))
+ .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
+ .put(infixBinary(GREATER_THAN, ">", "gt"))
+ .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=", "ge"))
+ .put(infixBinary(EQUALS, "==", StormSqlFunctions.class, "eq"))
+ .put(infixBinary(NOT_EQUALS, "<>", StormSqlFunctions.class, "ne"))
+ .put(infixBinary(PLUS, "+", "plus"))
+ .put(infixBinary(MINUS, "-", "minus"))
+ .put(infixBinary(MULTIPLY, "*", "multiply"))
+ .put(infixBinary(DIVIDE, "/", "divide"))
+ .put(infixBinary(DIVIDE_INTEGER, "/", "divide"))
.put(expect(IS_NULL, null))
.put(expectNot(IS_NOT_NULL, null))
.put(expect(IS_TRUE, true))
@@ -210,8 +212,38 @@ class ExprCompiler implements RexVisitor<String> {
}
}
+ private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
+ final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
+ if (nullPolicy != NullPolicy.STRICT) {
+ throw new UnsupportedOperationException();
+ }
+ CallExprPrinter printer = new CallExprPrinter() {
+ @Override
+ public String translate(ExprCompiler compiler, RexCall call) {
+ PrintWriter pw = compiler.pw;
+ String val = compiler.reserveName();
+ pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
+ List<String> args = new ArrayList<>();
+ for (RexNode op : call.getOperands()) {
+ args.add(op.accept(compiler));
+ }
+ pw.print("if (false) {}\n");
+ for (int i = 0; i < args.size(); ++i) {
+ String arg = args.get(i);
+ if (call.getOperands().get(i).getType().isNullable()) {
+ pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg));
+ }
+ }
+ String calc = printMethodCall(method.method, args);
+ pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
+ return val;
+ }
+ };
+ return new AbstractMap.SimpleImmutableEntry<>(op, printer);
+ }
+
private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
- (SqlOperator op, final String javaOperator) {
+ (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) {
CallExprPrinter trans = new CallExprPrinter() {
@Override
public String translate(
@@ -221,40 +253,44 @@ class ExprCompiler implements RexVisitor<String> {
String val = compiler.reserveName();
RexNode op0 = call.getOperands().get(0);
RexNode op1 = call.getOperands().get(1);
+ PrintWriter pw = compiler.pw;
+ if (backupMethodName != null) {
+ if (!compiler.primitiveCompareExpr(op, op0.getType())) {
+ String lhs = op0.accept(compiler);
+ String rhs = op1.accept(compiler);
+ pw.print(String.format("%s %s = %s;\n", compiler.javaTypeName(call), val,
+ printMethodCall(clazz, backupMethodName, true, Lists.newArrayList(lhs, rhs))));
+ return val;
+ }
+ }
boolean lhsNullable = op0.getType().isNullable();
boolean rhsNullable = op1.getType().isNullable();
- PrintWriter pw = compiler.pw;
- String lhs = op0.accept(compiler);
pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
- if (!lhsNullable) {
- String rhs = op1.accept(compiler);
- String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", rhs);
- if (!rhsNullable) {
- pw.print(String.format("%s = %s;\n", val, calc));
- } else {
- pw.print(
- String.format("%s = %s == null ? null : (%s);\n",
- val, rhs, calc));
- }
- } else {
- pw.print(String.format("if (%2$s == null) { %1$s = null; }\n",
- val, lhs));
- pw.print("else {\n");
- String rhs = op1.accept(compiler);
- String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", lhs);
- if (!rhsNullable) {
- pw.print(String.format("%s = %s;\n}\n", val, calc));
- } else {
- pw.print(String.format("%1$s = %2$s == null ? null : (%3$s);\n}\n", val, rhs, calc));
- }
+ String lhs = op0.accept(compiler);
+ String rhs = op1.accept(compiler);
+ pw.print("if (false) {}\n");
+ if (lhsNullable) {
+ String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op1);
+ pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, lhs, calc));
}
+ if (rhsNullable) {
+ String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op0);
+ pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, rhs, calc));
+ }
+ String calc = String.format("%s %s %s", lhs, javaOperator, rhs);
+ pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
return val;
}
};
return new AbstractMap.SimpleImmutableEntry<>(op, trans);
}
+ private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
+ (final SqlOperator op, final String javaOperator, final String backupMethodName) {
+ return infixBinary(op, javaOperator, SqlFunctions.class, backupMethodName);
+ }
+
private Map.Entry<SqlOperator, CallExprPrinter> expect(
SqlOperator op, final Boolean expect) {
return expect0(op, expect, false);
@@ -327,17 +363,16 @@ class ExprCompiler implements RexVisitor<String> {
pw.print(String.format(" %1$s = %2$s;\n}\n", val, rhs));
} else {
String foldedLHS = foldNullExpr(
- String.format("%1$s == null || %1$s", lhs), "true", lhs);
+ String.format("%1$s == null || %1$s", lhs), "true", op0);
pw.print(String.format("if (%s) {\n", foldedLHS));
String rhs = op1.accept(compiler);
String s;
if (rhsNullable) {
s = foldNullExpr(
String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
- rhs),
- "null", rhs);
+ rhs), "null", op1);
} else {
- s = String.format("!(%2$s) ? false : %1$s", lhs, rhs);
+ s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
}
pw.print(String.format(" %1$s = %2$s;\n", val, s));
pw.print(String.format("} else { %1$s = false; }\n", val));
@@ -369,16 +404,16 @@ class ExprCompiler implements RexVisitor<String> {
pw.print(String.format(" %1$s = %2$s;\n}\n", val, rhs));
} else {
String foldedLHS = foldNullExpr(
- String.format("%1$s == null || !(%1$s)", lhs), "true", lhs);
+ String.format("%1$s == null || !(%1$s)", lhs), "true", op0);
pw.print(String.format("if (%s) {\n", foldedLHS));
String rhs = op1.accept(compiler);
String s;
if (rhsNullable) {
s = foldNullExpr(
String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
- "null", rhs);
+ "null", op1);
} else {
- s = String.format("%2$s ? %2$s : %1$s", lhs, rhs);
+ s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
}
pw.print(String.format(" %1$s = %2$s;\n", val, s));
pw.print(String.format("} else { %1$s = true; }\n", val));
@@ -402,20 +437,34 @@ class ExprCompiler implements RexVisitor<String> {
pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
} else {
String s = foldNullExpr(
- String.format("%1$s == null ? null : !(%1$s)", lhs), "null", lhs);
+ String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op);
pw.print(String.format("%1$s = %2$s;\n", val, s));
}
return val;
}
};
+ }
- private static String foldNullExpr(String notNullExpr, String
- nullExpr, String op) {
- if (op.equals("null")) {
- return nullExpr;
- } else {
- return notNullExpr;
- }
+ private static String foldNullExpr(String notNullExpr, String
+ nullExpr, RexNode op) {
+ if (op instanceof RexLiteral && ((RexLiteral)op).getTypeName() == SqlTypeName.NULL) {
+ return nullExpr;
+ } else {
+ return notNullExpr;
+ }
+ }
+
+ private static String printMethodCall(Method method, List<String> args) {
+ return printMethodCall(method.getDeclaringClass(), method.getName(),
+ Modifier.isStatic(method.getModifiers()), args);
+ }
+
+ private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
+ if (isStatic) {
+ return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
+ } else {
+ return String.format("%s.%s(%s)", args.get(0), method,
+ Joiner.on(',').join(args.subList(1, args.size())));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
index d006261..1096f5b 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -22,7 +22,7 @@ import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -38,12 +38,12 @@ public class PlanCompiler {
"// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
"import java.util.Iterator;", "import java.util.Map;",
"import backtype.storm.tuple.Values;",
- "import org.apache.storm.sql.storm.AbstractChannelHandler;",
- "import org.apache.storm.sql.storm.Channels;",
- "import org.apache.storm.sql.storm.ChannelContext;",
- "import org.apache.storm.sql.storm.ChannelHandler;",
- "import org.apache.storm.sql.storm.DataSource;",
- "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;",
+ "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
+ "import org.apache.storm.sql.runtime.Channels;",
+ "import org.apache.storm.sql.runtime.ChannelContext;",
+ "import org.apache.storm.sql.runtime.ChannelHandler;",
+ "import org.apache.storm.sql.runtime.DataSource;",
+ "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
"public final class Processor extends AbstractValuesProcessor {", "");
private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
" @Override",
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 07b367f..e18b9f8 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -19,8 +19,8 @@ package org.apache.storm.sql;
import backtype.storm.tuple.Values;
import org.apache.storm.sql.compiler.TestUtils;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index f03cff8..a5f9d67 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -86,8 +86,8 @@ public class TestExprCompiler {
}
}
assertThat(sw.get(0).toString(), containsString("1 > 2"));
- assertThat(sw.get(1).toString(), containsString("3 + 5"));
- assertThat(sw.get(2).toString(), containsString("1 - 1.0E0"));
- assertThat(sw.get(3).toString(), containsString("3 +"));
+ assertThat(sw.get(1).toString(), containsString("plus(3,5)"));
+ assertThat(sw.get(2).toString(), containsString("minus(1,1.0E0)"));
+ assertThat(sw.get(3).toString(), containsString("plus(3,"));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 8454d7e..1d98664 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -23,9 +23,9 @@ import com.google.common.collect.Lists;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import org.junit.Test;
import java.util.ArrayList;
@@ -99,6 +99,28 @@ public class TestExprSemantic {
false, null), v);
}
+ @Test
+ public void testEquals() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
+ "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
+ ));
+ assertEquals(new Values(false, null, true, null, null, false,
+ true, null, false, null, null, true), v);
+ }
+
+ @Test
+ public void testStringMethods() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "UPPER('a')", "LOWER('A')", "INITCAP('foo')",
+ "SUBSTRING('foo', 2)", "CHARACTER_LENGTH('foo')", "CHAR_LENGTH('foo')",
+ "'ab' || 'cd'"
+ ));
+ assertEquals(new Values("A", "a", "Foo", "oo", 3, 3, "abcd"), v);
+ }
+
private Values testExpr(List<String> exprs) throws Exception {
String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
" WHERE ID > 0 AND ID < 2";
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
index 30df0f3..d32fdca 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -21,9 +21,9 @@ import backtype.storm.tuple.Values;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index d820f22..623a2f4 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -57,7 +57,7 @@ public class TestRelNodeCompiler {
RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
compiler.visitProject(project);
pw.flush();
- Assert.assertThat(sw.toString(), containsString("+ 1"));
+ Assert.assertThat(sw.toString(), containsString("plus("));
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
index 6731c90..5aa4cb0 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
@@ -15,9 +15,9 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.storm.ChannelContext;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.runtime.ChannelContext;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
import java.util.ArrayList;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
new file mode 100644
index 0000000..73a078c
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -0,0 +1,37 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+ @Override
+ public abstract void dataReceived(ChannelContext ctx, Values data);
+
+ @Override
+ public void channelInactive(ChannelContext ctx) {
+
+ }
+
+ @Override
+ public void exceptionCaught(Throwable cause) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..11aa065
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+ /**
+ * Initialize the data sources.
+ *
+ * @param data a map from the table name to the iterators of the values.
+ *
+ */
+ public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+ result);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
new file mode 100644
index 0000000..71aba03
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -0,0 +1,30 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public interface ChannelContext {
+ /**
+ * Emit data to the next stage of the data pipeline.
+ */
+ void emit(Values data);
+ void fireChannelInactive();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
new file mode 100644
index 0000000..117f312
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -0,0 +1,39 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+ void dataReceived(ChannelContext ctx, Values data);
+
+ /**
+ * The producer of the data has indicated that the channel is no longer
+ * active.
+ * @param ctx
+ */
+ void channelInactive(ChannelContext ctx);
+
+ void exceptionCaught(Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
new file mode 100644
index 0000000..7214f9a
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -0,0 +1,80 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public class Channels {
+ private static final ChannelContext VOID_CTX = new ChannelContext() {
+ @Override
+ public void emit(Values data) {}
+
+ @Override
+ public void fireChannelInactive() {}
+ };
+
+ private static class ChannelContextAdapter implements ChannelContext {
+ private final ChannelHandler handler;
+ private final ChannelContext next;
+
+ public ChannelContextAdapter(
+ ChannelContext next, ChannelHandler handler) {
+ this.handler = handler;
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ handler.dataReceived(next, data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ handler.channelInactive(next);
+ }
+ }
+
+ private static class ForwardingChannelContext implements ChannelContext {
+ private final ChannelContext next;
+
+ public ForwardingChannelContext(ChannelContext next) {
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ next.emit(data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ next.fireChannelInactive();
+ }
+ }
+
+ public static ChannelContext chain(
+ ChannelContext next, ChannelHandler handler) {
+ return new ChannelContextAdapter(next, handler);
+ }
+
+ public static ChannelContext voidContext() {
+ return VOID_CTX;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
new file mode 100644
index 0000000..3e80cb2
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -0,0 +1,29 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+ void open(ChannelContext ctx);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
new file mode 100644
index 0000000..62b1019
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -0,0 +1,36 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+public class StormSqlFunctions {
+ public static Boolean eq(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return b0.equals(b1);
+ }
+
+ public static Boolean ne(Object b0, Object b1) {
+ if (b0 == null || b1 == null) {
+ return null;
+ }
+ return !b0.equals(b1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
deleted file mode 100644
index cf110e3..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public abstract class AbstractChannelHandler implements ChannelHandler {
- @Override
- public abstract void dataReceived(ChannelContext ctx, Values data);
-
- @Override
- public void channelInactive(ChannelContext ctx) {
-
- }
-
- @Override
- public void exceptionCaught(Throwable cause) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
deleted file mode 100644
index a2806b2..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public interface ChannelContext {
- /**
- * Emit data to the next stage of the data pipeline.
- */
- void emit(Values data);
- void fireChannelInactive();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
deleted file mode 100644
index 8cd3a28..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-/**
- * DataListener provides an event-driven interface for the user to process
- * series of events.
- */
-public interface ChannelHandler {
- void dataReceived(ChannelContext ctx, Values data);
-
- /**
- * The producer of the data has indicated that the channel is no longer
- * active.
- * @param ctx
- */
- void channelInactive(ChannelContext ctx);
-
- void exceptionCaught(Throwable cause);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
deleted file mode 100644
index b5bb619..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-import backtype.storm.tuple.Values;
-
-public class Channels {
- private static final ChannelContext VOID_CTX = new ChannelContext() {
- @Override
- public void emit(Values data) {}
-
- @Override
- public void fireChannelInactive() {}
- };
-
- private static class ChannelContextAdapter implements ChannelContext {
- private final ChannelHandler handler;
- private final ChannelContext next;
-
- public ChannelContextAdapter(
- ChannelContext next, ChannelHandler handler) {
- this.handler = handler;
- this.next = next;
- }
-
- @Override
- public void emit(Values data) {
- handler.dataReceived(next, data);
- }
-
- @Override
- public void fireChannelInactive() {
- handler.channelInactive(next);
- }
- }
-
- private static class ForwardingChannelContext implements ChannelContext {
- private final ChannelContext next;
-
- public ForwardingChannelContext(ChannelContext next) {
- this.next = next;
- }
-
- @Override
- public void emit(Values data) {
- next.emit(data);
- }
-
- @Override
- public void fireChannelInactive() {
- next.fireChannelInactive();
- }
- }
-
- public static ChannelContext chain(
- ChannelContext next, ChannelHandler handler) {
- return new ChannelContextAdapter(next, handler);
- }
-
- public static ChannelContext voidContext() {
- return VOID_CTX;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
deleted file mode 100644
index 84fa6e0..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.storm;
-
-/**
- * A DataSource ingests data in StormSQL. It provides a series of tuple to
- * the downstream {@link ChannelHandler}.
- *
- */
-public interface DataSource {
- void open(ChannelContext ctx);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/39400075/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
deleted file mode 100644
index bd068be..0000000
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.storm.sql.storm.runtime;
-
-import backtype.storm.tuple.Values;
-import org.apache.storm.sql.storm.ChannelHandler;
-import org.apache.storm.sql.storm.DataSource;
-
-import java.util.Map;
-
-/**
- * Subclass of AbstractTupleProcessor provides a series of tuple. It
- * takes a series of iterators of {@link Values} and produces a stream of
- * tuple.
- *
- * The subclass implements the {@see next()} method to provide
- * the output of the stream. It can choose to return null in {@see next()} to
- * indicate that this particular iteration is a no-op. SQL processors depend
- * on this semantic to implement filtering and nullable records.
- */
-public abstract class AbstractValuesProcessor {
-
- /**
- * Initialize the data sources.
- *
- * @param data a map from the table name to the iterators of the values.
- *
- */
- public abstract void initialize(Map<String, DataSource> data, ChannelHandler
- result);
-}