You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/01 04:58:53 UTC

[1/3] storm git commit: [STORM-1570] Storm SQL support for nested map and array lookup

Repository: storm
Updated Branches:
  refs/heads/master 9aa8bf083 -> 87e3c2467


[STORM-1570] Storm SQL support for nested map and array lookup

Added support to handle array, and nested map lookups in Storm SQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f644360
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f644360
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f644360

Branch: refs/heads/master
Commit: 0f644360042a70d7cbbea3272fdf93903cc4e3ed
Parents: 4ca7522
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Feb 26 13:03:13 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Mar 1 00:09:40 2016 +0530

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/CompilerUtil.java |  7 ++-
 .../apache/storm/sql/compiler/ExprCompiler.java | 32 ++++++++--
 .../backends/standalone/RelNodeCompiler.java    |  6 +-
 .../apache/storm/sql/parser/StormParser.java    |  5 ++
 .../test/org/apache/storm/sql/TestStormSql.java | 64 +++++++++++++++++---
 .../storm/sql/compiler/TestCompilerUtils.java   | 62 ++++++++++++++++---
 .../storm/sql/compiler/TestExprSemantic.java    | 18 ++++++
 .../backends/standalone/TestPlanCompiler.java   | 20 ++++++
 .../backends/trident/TestPlanCompiler.java      |  4 +-
 .../test/org/apache/storm/sql/TestUtils.java    | 32 +++++++++-
 10 files changed, 223 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 30ea0e3..7f9258c 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -79,8 +79,11 @@ public class CompilerUtil {
     private Statistic stats;
 
     public TableBuilderInfo field(String name, SqlTypeName type) {
-      RelDataType dataType = typeFactory.createSqlType(type);
-      fields.add(new FieldType(name, dataType));
+      return field(name, typeFactory.createSqlType(type));
+    }
+
+    public TableBuilderInfo field(String name, RelDataType type) {
+      fields.add(new FieldType(name, type));
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 01024f0..c43c32f 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -26,8 +26,12 @@ import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.*;
 import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Util;
@@ -179,6 +183,7 @@ public class ExprCompiler implements RexVisitor<String> {
           .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
           .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
           .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT))
+          .put(builtInMethod(ITEM, BuiltInMethod.ANY_ITEM, NullPolicy.STRICT))
           .put(infixBinary(LESS_THAN, "<", "lt"))
           .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
           .put(infixBinary(GREATER_THAN, ">", "gt"))
@@ -198,7 +203,8 @@ public class ExprCompiler implements RexVisitor<String> {
           .put(expectNot(IS_NOT_FALSE, false))
           .put(AND, AND_EXPR)
           .put(OR, OR_EXPR)
-          .put(NOT, NOT_EXPR);
+          .put(NOT, NOT_EXPR)
+          .put(CAST, CAST_EXPR);
       this.translators = builder.build();
     }
 
@@ -213,7 +219,7 @@ public class ExprCompiler implements RexVisitor<String> {
     }
 
     private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
-        final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
+            final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
       if (nullPolicy != NullPolicy.STRICT) {
         throw new UnsupportedOperationException();
       }
@@ -369,8 +375,8 @@ public class ExprCompiler implements RexVisitor<String> {
           String s;
           if (rhsNullable) {
             s = foldNullExpr(
-                String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
-                    rhs), "null", op1);
+                String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : ((%1$s == null || %2$s == null) ? null : Boolean.TRUE)",
+                              lhs, rhs), "null", op1);
           } else {
             s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
           }
@@ -410,7 +416,8 @@ public class ExprCompiler implements RexVisitor<String> {
           String s;
           if (rhsNullable) {
             s = foldNullExpr(
-                String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
+                String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s == null || %2$s == null) ? null : Boolean.FALSE)",
+                              lhs, rhs),
                 "null", op1);
           } else {
             s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
@@ -443,6 +450,21 @@ public class ExprCompiler implements RexVisitor<String> {
         return val;
       }
     };
+
+
+    private static final CallExprPrinter CAST_EXPR = new CallExprPrinter() {
+      @Override
+      public String translate(
+              ExprCompiler compiler, RexCall call) {
+        String val = compiler.reserveName();
+        PrintWriter pw = compiler.pw;
+        RexNode op = call.getOperands().get(0);
+        String lhs = op.accept(compiler);
+        pw.print(String.format("final %1$s %2$s = (%1$s) %3$s;\n",
+                               compiler.javaTypeName(call), val, lhs));
+        return val;
+      }
+    };
   }
 
   private static String foldNullExpr(String notNullExpr, String

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index 6d51a11..845bb3a 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -64,7 +64,11 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
     beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     String r = filter.getCondition().accept(compiler);
-    pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r));
+    if (filter.getCondition().getType().isNullable()) {
+      pw.print(String.format("    if (%s != null && %s) { ctx.emit(_data); }\n", r, r));
+    } else {
+      pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r, r));
+    }
     endStage();
     return null;
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
index 670901e..8444e1e 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -33,6 +33,11 @@ public class StormParser {
     this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
     this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
     this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
+    /*
+     *  By default parser uses [ ] for quoting identifiers. Switching to DQID (double quoted identifiers)
+     *  is needed for array and map access (m['x'] = 1 or arr[2] = 10 etc) to work.
+     */
+    this.impl.switchTo("DQID");
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 511e5ab..a85a907 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -17,11 +17,7 @@
  */
 package org.apache.storm.sql;
 
-import org.apache.storm.Config;
-import org.apache.storm.ILocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInitialStatus;
+import com.google.common.collect.ImmutableMap;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.sql.runtime.*;
 import org.junit.AfterClass;
@@ -31,9 +27,9 @@ import org.junit.Test;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 public class TestStormSql {
   private static class MockDataSourceProvider implements DataSourcesProvider {
@@ -56,14 +52,37 @@ public class TestStormSql {
     }
   }
 
+  private static class MockNestedDataSourceProvider implements DataSourcesProvider {
+    @Override
+    public String scheme() {
+      return "mocknested";
+    }
+
+    @Override
+    public DataSource construct(
+            URI uri, String inputFormatClass, String outputFormatClass,
+            List<FieldInfo> fields) {
+      return new TestUtils.MockNestedDataSource();
+    }
+
+    @Override
+    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                  String properties, List<FieldInfo> fields) {
+      throw new UnsupportedOperationException("Not supported");
+    }
+  }
+
+
   @BeforeClass
   public static void setUp() {
     DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+    DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
   }
 
   @AfterClass
   public static void tearDown() {
     DataSourcesRegistry.providerMap().remove("mock");
+    DataSourcesRegistry.providerMap().remove("mocknested");
   }
 
   @Test
@@ -79,4 +98,35 @@ public class TestStormSql {
     Assert.assertEquals(4, values.get(0).get(0));
     Assert.assertEquals(5, values.get(1).get(0));
   }
+
+  @Test
+  public void testExternalDataSourceNested() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+                     "FROM FOO " +
+                     "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    System.out.println(values);
+    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+    Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+  }
+
+  @Test
+  public void testExternalNestedInvalidAccess() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+                     "FROM FOO " +
+                     "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD['a'] = 200");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(0, values.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 994e419..43b54f7 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -17,31 +17,74 @@
  */
 package org.apache.storm.sql.compiler;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.*;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 public class TestCompilerUtils {
+
     public static CalciteState sqlOverDummyTable(String sql)
-        throws RelConversionException, ValidationException, SqlParseException {
+            throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+                (RelDataTypeSystem.DEFAULT);
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+                .field("ID", SqlTypeName.INTEGER)
+                .field("NAME", typeFactory.createType(String.class))
+                .field("ADDR", typeFactory.createType(String.class))
+                .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+                schema).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverNestedTable(String sql)
+            throws RelConversionException, ValidationException, SqlParseException {
         SchemaPlus schema = Frameworks.createRootSchema(true);
         JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-            (RelDataTypeSystem.DEFAULT);
+                (RelDataTypeSystem.DEFAULT);
+
         StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("ID", SqlTypeName.INTEGER).build();
+                .field("ID", SqlTypeName.INTEGER)
+                .field("MAPFIELD", SqlTypeName.ANY)
+                .field("NESTEDMAPFIELD", SqlTypeName.ANY)
+                .field("ARRAYFIELD", SqlTypeName.ANY)
+                .build();
         Table table = streamableTable.stream();
         schema.add("FOO", table);
         schema.add("BAR", table);
         FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-            schema).build();
+                schema).build();
         Planner planner = Frameworks.getPlanner(config);
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
@@ -58,7 +101,12 @@ public class TestCompilerUtils {
             this.tree = tree;
         }
 
-        public SchemaPlus schema() { return schema; }
-        public RelNode tree() { return tree; }
+        public SchemaPlus schema() {
+            return schema;
+        }
+
+        public RelNode tree() {
+            return tree;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 8304a33..f2ac081 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -90,6 +90,24 @@ public class TestExprSemantic {
   }
 
   @Test
+  public void testAndWithNullable() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "ADDR = 'a' AND NAME = 'a'", "NAME = 'a' AND ADDR = 'a'", "NAME = 'x' AND ADDR = 'a'", "ADDR = 'a' AND NAME = 'x'"
+            ));
+    assertEquals(new Values(false, false, null, null), v);
+  }
+
+  @Test
+  public void testOrWithNullable() throws Exception {
+    Values v = testExpr(
+            Lists.newArrayList(
+                    "ADDR = 'a'  OR NAME = 'a'", "NAME = 'a' OR ADDR = 'a' ", "NAME = 'x' OR ADDR = 'a' ", "ADDR = 'a'  OR NAME = 'x'"
+            ));
+    assertEquals(new Values(null, null, true, true), v);
+  }
+
+  @Test
   public void testOrWithNull() throws Exception {
     Values v = testExpr(
         Lists.newArrayList(

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index ff28231..414aeee 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.sql.compiler.backends.standalone;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.storm.tuple.Values;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -30,6 +31,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,4 +68,22 @@ public class TestPlanCompiler {
     proc.initialize(data, h);
     Assert.assertEquals(new Values(true, false, true), values.get(0));
   }
+
+  @Test
+  public void testNested() throws Exception {
+    String sql = "SELECT ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
+            "FROM FOO " +
+            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[1] = 200";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockNestedDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+    Assert.assertEquals(new Values(2, map, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index ddc671a..0f8daa9 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -73,7 +73,7 @@ public class TestPlanCompiler {
   @Test
   public void testInsert() throws Exception {
     final int EXPECTED_VALUE_SIZE = 1;
-    String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
+    String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     final AbstractTridentProcessor proc = compiler.compile(state.tree());
@@ -82,7 +82,7 @@ public class TestPlanCompiler {
     data.put("BAR", new TestUtils.MockSqlTridentDataSource());
     final TridentTopology topo = proc.build(data);
     runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
-    Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
+    Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray());
   }
 
   private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,

http://git-wip-us.apache.org/repos/asf/storm/blob/0f644360/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index c5a4043..da763a7 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -35,6 +35,8 @@ import org.apache.storm.trident.spout.IBatchSpout;
 import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -44,7 +46,31 @@ public class TestUtils {
 
     public MockDataSource() {
       for (int i = 0; i < 5; ++i) {
-        RECORDS.add(new Values(i));
+        RECORDS.add(new Values(i, "x", null));
+      }
+    }
+
+    @Override
+    public void open(ChannelContext ctx) {
+      for (Values v : RECORDS) {
+        ctx.emit(v);
+      }
+      ctx.fireChannelInactive();
+    }
+  }
+
+  public static class MockNestedDataSource implements DataSource {
+    private final ArrayList<Values> RECORDS = new ArrayList<>();
+
+    public MockNestedDataSource() {
+      List<Integer> ints = Arrays.asList(100, 200, 300);
+      for (int i = 0; i < 5; ++i) {
+        Map<String, Integer> map = new HashMap<>();
+        map.put("b", i);
+        map.put("c", i*i);
+        Map<String, Map<String, Integer>> mm = new HashMap<>();
+        mm.put("a", map);
+        RECORDS.add(new Values(i, map, mm, ints));
       }
     }
 
@@ -85,11 +111,11 @@ public class TestUtils {
 
     private static class MockSpout implements IBatchSpout {
       private final ArrayList<Values> RECORDS = new ArrayList<>();
-      private final Fields OUTPUT_FIELDS = new Fields("ID");
+      private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
 
       public MockSpout() {
         for (int i = 0; i < 5; ++i) {
-          RECORDS.add(new Values(i));
+          RECORDS.add(new Values(i, "x", "y"));
         }
       }
 


[2/3] storm git commit: Merge branch 'STORM-1570' of http://github.com/arunmahadevan/storm into STORM-1570

Posted by sr...@apache.org.
Merge branch 'STORM-1570' of http://github.com/arunmahadevan/storm into STORM-1570


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbe920a2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbe920a2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbe920a2

Branch: refs/heads/master
Commit: dbe920a2f8b4e6a06f657b24e2df90fb5601333f
Parents: 9aa8bf0 0f64436
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 29 19:57:38 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 29 19:57:38 2016 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/CompilerUtil.java |  7 ++-
 .../apache/storm/sql/compiler/ExprCompiler.java | 32 ++++++++--
 .../backends/standalone/RelNodeCompiler.java    |  6 +-
 .../apache/storm/sql/parser/StormParser.java    |  5 ++
 .../test/org/apache/storm/sql/TestStormSql.java | 64 +++++++++++++++++---
 .../storm/sql/compiler/TestCompilerUtils.java   | 62 ++++++++++++++++---
 .../storm/sql/compiler/TestExprSemantic.java    | 18 ++++++
 .../backends/standalone/TestPlanCompiler.java   | 20 ++++++
 .../backends/trident/TestPlanCompiler.java      |  4 +-
 .../test/org/apache/storm/sql/TestUtils.java    | 32 +++++++++-
 10 files changed, 223 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1570 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1570 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87e3c246
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87e3c246
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87e3c246

Branch: refs/heads/master
Commit: 87e3c246762b56ee8fd638662a9776fdf36c8ef5
Parents: dbe920a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 29 19:58:14 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 29 19:58:14 2016 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87e3c246/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6514379..689c138 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -40,6 +40,7 @@
  * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
 
 ## 1.0.0
+ * STORM-1570: Storm SQL support for nested fields and array
  * STORM-1576: fix ConcurrentModificationException in addCheckpointInputs
  * STORM-1488: UI Topology Page component last error timestamp is from 1970
  * STORM-1552: Fix topology event sampling log dir