You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/01 04:58:53 UTC
[1/3] storm git commit: [STORM-1570] Storm SQL support for nested map
and array lookup
Repository: storm
Updated Branches:
refs/heads/master 9aa8bf083 -> 87e3c2467
[STORM-1570] Storm SQL support for nested map and array lookup
Added support to handle array, and nested map lookups in Storm SQL.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f644360
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f644360
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f644360
Branch: refs/heads/master
Commit: 0f644360042a70d7cbbea3272fdf93903cc4e3ed
Parents: 4ca7522
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Feb 26 13:03:13 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Mar 1 00:09:40 2016 +0530
----------------------------------------------------------------------
.../apache/storm/sql/compiler/CompilerUtil.java | 7 ++-
.../apache/storm/sql/compiler/ExprCompiler.java | 32 ++++++++--
.../backends/standalone/RelNodeCompiler.java | 6 +-
.../apache/storm/sql/parser/StormParser.java | 5 ++
.../test/org/apache/storm/sql/TestStormSql.java | 64 +++++++++++++++++---
.../storm/sql/compiler/TestCompilerUtils.java | 62 ++++++++++++++++---
.../storm/sql/compiler/TestExprSemantic.java | 18 ++++++
.../backends/standalone/TestPlanCompiler.java | 20 ++++++
.../backends/trident/TestPlanCompiler.java | 4 +-
.../test/org/apache/storm/sql/TestUtils.java | 32 +++++++++-
10 files changed, 223 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 30ea0e3..7f9258c 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -79,8 +79,11 @@ public class CompilerUtil {
private Statistic stats;
public TableBuilderInfo field(String name, SqlTypeName type) {
- RelDataType dataType = typeFactory.createSqlType(type);
- fields.add(new FieldType(name, dataType));
+ return field(name, typeFactory.createSqlType(type));
+ }
+
+ public TableBuilderInfo field(String name, RelDataType type) {
+ fields.add(new FieldType(name, type));
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 01024f0..c43c32f 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -26,8 +26,12 @@ import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.*;
import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Util;
@@ -179,6 +183,7 @@ public class ExprCompiler implements RexVisitor<String> {
.put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
.put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
.put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT))
+ .put(builtInMethod(ITEM, BuiltInMethod.ANY_ITEM, NullPolicy.STRICT))
.put(infixBinary(LESS_THAN, "<", "lt"))
.put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
.put(infixBinary(GREATER_THAN, ">", "gt"))
@@ -198,7 +203,8 @@ public class ExprCompiler implements RexVisitor<String> {
.put(expectNot(IS_NOT_FALSE, false))
.put(AND, AND_EXPR)
.put(OR, OR_EXPR)
- .put(NOT, NOT_EXPR);
+ .put(NOT, NOT_EXPR)
+ .put(CAST, CAST_EXPR);
this.translators = builder.build();
}
@@ -213,7 +219,7 @@ public class ExprCompiler implements RexVisitor<String> {
}
private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
- final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
+ final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
if (nullPolicy != NullPolicy.STRICT) {
throw new UnsupportedOperationException();
}
@@ -369,8 +375,8 @@ public class ExprCompiler implements RexVisitor<String> {
String s;
if (rhsNullable) {
s = foldNullExpr(
- String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
- rhs), "null", op1);
+ String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : ((%1$s == null || %2$s == null) ? null : Boolean.TRUE)",
+ lhs, rhs), "null", op1);
} else {
s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
}
@@ -410,7 +416,8 @@ public class ExprCompiler implements RexVisitor<String> {
String s;
if (rhsNullable) {
s = foldNullExpr(
- String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
+ String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s == null || %2$s == null) ? null : Boolean.FALSE)",
+ lhs, rhs),
"null", op1);
} else {
s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
@@ -443,6 +450,21 @@ public class ExprCompiler implements RexVisitor<String> {
return val;
}
};
+
+
+ private static final CallExprPrinter CAST_EXPR = new CallExprPrinter() {
+ @Override
+ public String translate(
+ ExprCompiler compiler, RexCall call) {
+ String val = compiler.reserveName();
+ PrintWriter pw = compiler.pw;
+ RexNode op = call.getOperands().get(0);
+ String lhs = op.accept(compiler);
+ pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n",
+ compiler.javaTypeName(call), val, lhs));
+ return val;
+ }
+ };
}
private static String foldNullExpr(String notNullExpr, String
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 6d51a11..845bb3a 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -64,7 +64,11 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
beginStage(filter);
ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
String r = filter.getCondition().accept(compiler);
- pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r));
+ if (filter.getCondition().getType().isNullable()) {
+ pw.print(String.format(" if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
+ } else {
+ pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r, r));
+ }
endStage();
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
index 670901e..8444e1e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -33,6 +33,11 @@ public class StormParser {
this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
+ /*
+ * By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
+ * is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
+ */
+ this.impl.switchTo("DQID");
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 511e5ab..a85a907 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -17,11 +17,7 @@
*/
package org.apache.storm.sql;
-import org.apache.storm.Config;
-import org.apache.storm.ILocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
+import com.google.common.collect.ImmutableMap;
import org.apache.storm.tuple.Values;
import org.apache.storm.sql.runtime.*;
import org.junit.AfterClass;
@@ -31,9 +27,9 @@ import org.junit.Test;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
public class TestStormSql {
private static class MockDataSourceProvider implements DataSourcesProvider {
@@ -56,14 +52,37 @@ public class TestStormSql {
}
}
+ private static class MockNestedDataSourceProvider implements DataSourcesProvider {
+ @Override
+ public String scheme() {
+ return "mocknested";
+ }
+
+ @Override
+ public DataSource construct(
+ URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ return new TestUtils.MockNestedDataSource();
+ }
+
+ @Override
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ String properties, List<FieldInfo> fields) {
+ throw new UnsupportedOperationException("Not supported");
+ }
+ }
+
+
@BeforeClass
public static void setUp() {
DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+ DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
}
@AfterClass
public static void tearDown() {
DataSourcesRegistry.providerMap().remove("mock");
+ DataSourcesRegistry.providerMap().remove("mocknested");
}
@Test
@@ -79,4 +98,35 @@ public class TestStormSql {
Assert.assertEquals(4, values.get(0).get(0));
Assert.assertEquals(5, values.get(1).get(0));
}
+
+ @Test
+ public void testExternalDataSourceNested() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ System.out.println(values);
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+ }
+
+ @Test
+ public void testExternalNestedInvalidAccess() throws Exception {
+ List<String> stmt = new ArrayList<>();
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD['a'] = 200");
+ StormSql sql = StormSql.construct();
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ sql.execute(stmt, h);
+ Assert.assertEquals(0, values.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 994e419..43b54f7 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -17,31 +17,74 @@
*/
package org.apache.storm.sql.compiler;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.*;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
public class TestCompilerUtils {
+
public static CalciteState sqlOverDummyTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER)
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+ schema).build();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverNestedTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
SchemaPlus schema = Frameworks.createRootSchema(true);
JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
+ (RelDataTypeSystem.DEFAULT);
+
StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER).build();
+ .field("ID", SqlTypeName.INTEGER)
+ .field("MAPFIELD", SqlTypeName.ANY)
+ .field("NESTEDMAPFIELD", SqlTypeName.ANY)
+ .field("ARRAYFIELD", SqlTypeName.ANY)
+ .build();
Table table = streamableTable.stream();
schema.add("FOO", table);
schema.add("BAR", table);
FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
- schema).build();
+ schema).build();
Planner planner = Frameworks.getPlanner(config);
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
@@ -58,7 +101,12 @@ public class TestCompilerUtils {
this.tree = tree;
}
- public SchemaPlus schema() { return schema; }
- public RelNode tree() { return tree; }
+ public SchemaPlus schema() {
+ return schema;
+ }
+
+ public RelNode tree() {
+ return tree;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 8304a33..f2ac081 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -90,6 +90,24 @@ public class TestExprSemantic {
}
@Test
+ public void testAndWithNullable() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "ADDR = 'a' AND NAME = 'a'", "NAME = 'a' AND ADDR = 'a'", "NAME = 'x' AND ADDR = 'a'", "ADDR = 'a' AND NAME = 'x'"
+ ));
+ assertEquals(new Values(false, false, null, null), v);
+ }
+
+ @Test
+ public void testOrWithNullable() throws Exception {
+ Values v = testExpr(
+ Lists.newArrayList(
+ "ADDR = 'a' OR NAME = 'a'", "NAME = 'a' OR ADDR = 'a' ", "NAME = 'x' OR ADDR = 'a' ", "ADDR = 'a' OR NAME = 'x'"
+ ));
+ assertEquals(new Values(null, null, true, true), v);
+ }
+
+ @Test
public void testOrWithNull() throws Exception {
Values v = testExpr(
Lists.newArrayList(
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index ff28231..414aeee 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.sql.compiler.backends.standalone;
+import com.google.common.collect.ImmutableMap;
import org.apache.storm.tuple.Values;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -30,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,4 +68,22 @@ public class TestPlanCompiler {
proc.initialize(data, h);
Assert.assertEquals(new Values(true, false, true), values.get(0));
}
+
+ @Test
+ public void testNested() throws Exception {
+ String sql = "SELECT ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockNestedDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index ddc671a..0f8daa9 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -73,7 +73,7 @@ public class TestPlanCompiler {
@Test
public void testInsert() throws Exception {
final int EXPECTED_VALUE_SIZE = 1;
- String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
+ String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
PlanCompiler compiler = new PlanCompiler(typeFactory);
final AbstractTridentProcessor proc = compiler.compile(state.tree());
@@ -82,7 +82,7 @@ public class TestPlanCompiler {
data.put("BAR", new TestUtils.MockSqlTridentDataSource());
final TridentTopology topo = proc.build(data);
runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
+ Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray());
}
private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index c5a4043..da763a7 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -35,6 +35,8 @@ import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +46,31 @@ public class TestUtils {
public MockDataSource() {
for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i));
+ RECORDS.add(new Values(i, "x", null));
+ }
+ }
+
+ @Override
+ public void open(ChannelContext ctx) {
+ for (Values v : RECORDS) {
+ ctx.emit(v);
+ }
+ ctx.fireChannelInactive();
+ }
+ }
+
+ public static class MockNestedDataSource implements DataSource {
+ private final ArrayList<Values> RECORDS = new ArrayList<>();
+
+ public MockNestedDataSource() {
+ List<Integer> ints = Arrays.asList(100, 200, 300);
+ for (int i = 0; i < 5; ++i) {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("b", i);
+ map.put("c", i*i);
+ Map<String, Map<String, Integer>> mm = new HashMap<>();
+ mm.put("a", map);
+ RECORDS.add(new Values(i, map, mm, ints));
}
}
@@ -85,11 +111,11 @@ public class TestUtils {
private static class MockSpout implements IBatchSpout {
private final ArrayList<Values> RECORDS = new ArrayList<>();
- private final Fields OUTPUT_FIELDS = new Fields("ID");
+ private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
public MockSpout() {
for (int i = 0; i < 5; ++i) {
- RECORDS.add(new Values(i));
+ RECORDS.add(new Values(i, "x", "y"));
}
}
[2/3] storm git commit: Merge branch 'STORM-1570' of
http://github.com/arunmahadevan/storm into STORM-1570
Posted by sr...@apache.org.
Merge branch 'STORM-1570' of http://github.com/arunmahadevan/storm into STORM-1570
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbe920a2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbe920a2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbe920a2
Branch: refs/heads/master
Commit: dbe920a2f8b4e6a06f657b24e2df90fb5601333f
Parents: 9aa8bf0 0f64436
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 29 19:57:38 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 29 19:57:38 2016 -0800
----------------------------------------------------------------------
.../apache/storm/sql/compiler/CompilerUtil.java | 7 ++-
.../apache/storm/sql/compiler/ExprCompiler.java | 32 ++++++++--
.../backends/standalone/RelNodeCompiler.java | 6 +-
.../apache/storm/sql/parser/StormParser.java | 5 ++
.../test/org/apache/storm/sql/TestStormSql.java | 64 +++++++++++++++++---
.../storm/sql/compiler/TestCompilerUtils.java | 62 ++++++++++++++++---
.../storm/sql/compiler/TestExprSemantic.java | 18 ++++++
.../backends/standalone/TestPlanCompiler.java | 20 ++++++
.../backends/trident/TestPlanCompiler.java | 4 +-
.../test/org/apache/storm/sql/TestUtils.java | 32 +++++++++-
10 files changed, 223 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-1570 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1570 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87e3c246
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87e3c246
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87e3c246
Branch: refs/heads/master
Commit: 87e3c246762b56ee8fd638662a9776fdf36c8ef5
Parents: dbe920a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 29 19:58:14 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 29 19:58:14 2016 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/87e3c246/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6514379..689c138 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -40,6 +40,7 @@
* STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
## 1.0.0
+ * STORM-1570: Storm SQL support for nested fields and array
* STORM-1576: fix ConcurrentModificationException in addCheckpointInputs
* STORM-1488: UI Topology Page component last error timestamp is from 1970
* STORM-1552: Fix topology event sampling log dir