You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/13 04:46:43 UTC

[1/3] storm git commit: STORM-2057 Support JOIN statement in Storm SQL

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 09bff55e0 -> f1e3120cd


STORM-2057 Support JOIN statement in Storm SQL

* Support INNER, LEFT / RIGHT / FULL OUTER JOIN based on Trident
* Limitation of feature is just same to Trident join
  * join is occurred for each batch (not across batches)
* Introduce new parameter on join(): JoinOutFieldsMode
  * default value of mode is COMPACT, which is as same as current
* created PreservingFieldsOrderJoinerMultiReducer
  * this preserves fields order from all streams fields
* modified codes to reflect the changes
* Reflect changes to doc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb7c8c75
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb7c8c75
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb7c8c75

Branch: refs/heads/1.x-branch
Commit: bb7c8c75484253a5d79ae04f1380eda183c2eeb9
Parents: 09bff55
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Aug 30 09:07:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:35:06 2016 +0900

----------------------------------------------------------------------
 docs/storm-sql.md                               |  16 +-
 external/sql/README.md                          |  12 +-
 .../trident/TridentLogicalPlanCompiler.java     |  61 ++++++-
 .../storm/sql/compiler/TestCompilerUtils.java   |  44 +++++
 .../backends/trident/TestPlanCompiler.java      |  74 ++++++++
 .../test/org/apache/storm/sql/TestUtils.java    | 151 ++++++++++++++++
 .../apache/storm/trident/JoinOutFieldsMode.java |  33 ++++
 .../apache/storm/trident/TridentTopology.java   |  52 +++++-
 .../storm/trident/operation/impl/JoinState.java |  22 +++
 .../operation/impl/JoinerMultiReducer.java      |  43 ++---
 ...PreservingFieldsOrderJoinerMultiReducer.java | 175 +++++++++++++++++++
 11 files changed, 637 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 104b852..3eaf678 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -28,6 +28,7 @@ The following features are supported in the current repository:
 * Projections
 * Aggregations (Grouping)
 * User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
 
 ## Specifying External Data Sources
 
@@ -94,7 +95,7 @@ and class for aggregate function is here:
   }
 ```
 
-If users doesn't define `result` method, result is the last return value of `add` method.
+If users don't define `result` method, result is the last return value of `add` method.
 Users need to define `result` method only when we need to transform accumulated value.
 
 ## Example: Filtering Kafka Stream
@@ -128,6 +129,13 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
 
 ## Current Limitations
 
-Aggregation, windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. All processors have a parallelism hint of 1.
-
-The current implementation of the Kafka connector in StormSQL assumes both the input and the output are in JSON formats. The connector has not yet recognized the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+  - Not across batches.
+  - Limitation came from `join` feature of Trident.
+  - Please refer this doc: `Trident API Overview` for details.
+- Specifying parallelism hints in the topology is not yet supported. 
+  - All processors have a parallelism hint of 1.
+- The current implementation of the Kafka connector in StormSQL assumes both the input and the output are in JSON formats. 
+  - The connector has not yet recognized the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index ae322f2..d0c0d06 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -21,6 +21,7 @@ The following features are supported in the current repository:
 * Projections
 * Aggregations (Grouping)
 * User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
 
 ## Specifying External Data Sources
 
@@ -80,7 +81,7 @@ and class for aggregate function is here:
   }
 ```
 
-If users doesn't define `result` method, result is the last return value of `add` method.
+If users don't define `result` method, result is the last return value of `add` method.
 Users need to define `result` method only when we need to transform accumulated value.
 
 ## Example: Filtering Kafka Stream
@@ -124,7 +125,14 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
 
 ## Current Limitations
 
-Windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. All processors have a parallelism hint of 1.
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+  - Not across batches.
+  - Limitation came from `join` feature of Trident.
+  - Please refer this doc: `Trident API Overview` for details.
+- Specifying parallelism hints in the topology is not yet supported. 
+  - All processors have a parallelism hint of 1.
 
 ## License
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
index b10e64e..688b164 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
@@ -22,13 +22,16 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Primitives;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
@@ -44,6 +47,8 @@ import org.apache.storm.sql.runtime.trident.operations.DivideForAverage;
 import org.apache.storm.sql.runtime.trident.operations.MaxBy;
 import org.apache.storm.sql.runtime.trident.operations.MinBy;
 import org.apache.storm.sql.runtime.trident.operations.SumBy;
+import org.apache.storm.trident.JoinOutFieldsMode;
+import org.apache.storm.trident.JoinType;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
@@ -131,8 +136,58 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
     }
 
     @Override
+    public IAggregatableStream visitJoin(Join join, List<IAggregatableStream> inputStreams) throws Exception {
+        if (inputStreams.size() != 2) {
+            throw new RuntimeException("Join is a BiRel");
+        }
+
+        int[] ordinals = new int[2];
+        if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
+            throw new UnsupportedOperationException("Only simple equi joins are supported");
+        }
+
+        List<JoinType> joinTypes = new ArrayList<>();
+        switch (join.getJoinType()) {
+            case INNER:
+                joinTypes.add(JoinType.INNER);
+                joinTypes.add(JoinType.INNER);
+                break;
+
+            case LEFT:
+                joinTypes.add(JoinType.INNER);
+                joinTypes.add(JoinType.OUTER);
+                break;
+
+            case RIGHT:
+                joinTypes.add(JoinType.OUTER);
+                joinTypes.add(JoinType.INNER);
+                break;
+
+            case FULL:
+                joinTypes.add(JoinType.OUTER);
+                joinTypes.add(JoinType.OUTER);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Unsupported join type: " + join.getJoinType());
+        }
+
+        String leftJoinFieldName = join.getLeft().getRowType().getFieldNames().get(ordinals[0]);
+        String rightJoinFieldName = join.getRight().getRowType().getFieldNames().get(ordinals[1]);
+
+        Stream leftInputStream = inputStreams.get(0).toStream();
+        Stream rightInputStream = inputStreams.get(1).toStream();
+        String stageName = getStageName(join);
+
+        return topology
+                .join(leftInputStream, new Fields(leftJoinFieldName), rightInputStream, new Fields(rightJoinFieldName),
+                        new Fields(join.getRowType().getFieldNames()), joinTypes, JoinOutFieldsMode.PRESERVE)
+                .name(stageName);
+    }
+
+    @Override
     public IAggregatableStream visitProject(Project project, List<IAggregatableStream> inputStreams) throws Exception {
-        if (inputStreams.size() > 1) {
+        if (inputStreams.size() != 1) {
             throw new RuntimeException("Project is a SingleRel");
         }
 
@@ -171,7 +226,7 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
 
     @Override
     public IAggregatableStream visitFilter(Filter filter, List<IAggregatableStream> inputStreams) throws Exception {
-        if (inputStreams.size() > 1) {
+        if (inputStreams.size() != 1) {
             throw new RuntimeException("Filter is a SingleRel");
         }
 
@@ -190,7 +245,7 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
 
     @Override
     public IAggregatableStream visitAggregate(Aggregate aggregate, List<IAggregatableStream> inputStreams) throws Exception {
-        if (inputStreams.size() > 1) {
+        if (inputStreams.size() != 1) {
             throw new RuntimeException("Aggregate is a SingleRel");
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 2243119..c3b9ad3 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -28,6 +29,7 @@ import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -39,6 +41,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.parser.ColumnConstraint;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -101,6 +104,7 @@ public class TestCompilerUtils {
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
+        System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
         return new CalciteState(schema, tree);
     }
 
@@ -134,6 +138,7 @@ public class TestCompilerUtils {
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
+        System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
         return new CalciteState(schema, tree);
     }
 
@@ -165,6 +170,45 @@ public class TestCompilerUtils {
         SqlNode parse = planner.parse(sql);
         SqlNode validate = planner.validate(parse);
         RelNode tree = planner.convert(validate);
+        System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
+            throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+                (RelDataTypeSystem.DEFAULT);
+
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+                .field("EMPID", SqlTypeName.INTEGER)
+                .field("EMPNAME", SqlTypeName.VARCHAR)
+                .field("DEPTID", SqlTypeName.INTEGER)
+                .build();
+        Table table = streamableTable.stream();
+
+        StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
+                .field("DEPTID", SqlTypeName.INTEGER)
+                .field("DEPTNAME", SqlTypeName.VARCHAR)
+                .build();
+        Table table2 = streamableTable2.stream();
+
+        schema.add("EMP", table);
+        schema.add("DEPT", table2);
+
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                false,
+                Collections.<String>emptyList(), typeFactory));
+        SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+                schema).operatorTable(chainedSqlOperatorTable).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
         return new CalciteState(schema, tree);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 68053da..0ad0a46 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -108,6 +108,80 @@ public class TestPlanCompiler {
   }
 
   @Test
+  public void testCompileEquiJoinAndGroupBy() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 2;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+    data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+    String sql = "SELECT d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID > 0 GROUP BY d.DEPTID";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(1, 2L), new Values(0, 2L)}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testCompileEquiJoinWithLeftOuterJoin() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 3;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+    data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+    String sql = "SELECT d.DEPTID, e.DEPTID FROM DEPT AS d LEFT OUTER JOIN EMP AS e ON d.DEPTID = e.DEPTID WHERE e.EMPID is null";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(2, null), new Values(3, null), new Values(4, null)}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testCompileEquiJoinWithRightOuterJoin() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 3;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+    data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+    String sql = "SELECT d.DEPTID, e.DEPTID FROM EMP AS e RIGHT OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID is null";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(2, null), new Values(3, null), new Values(4, null)}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testCompileEquiJoinWithFullOuterJoin() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 8;
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+    data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+    String sql = "SELECT e.DEPTID, d.DEPTNAME FROM EMP AS e FULL OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE (d.DEPTNAME is null OR e.EMPNAME is null)";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+    PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+    final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+    Assert.assertArrayEquals(new Values[] { new Values(null, "dept-2"), new Values(null, "dept-3"), new Values(null, "dept-4"),
+      new Values(10, null), new Values(11, null), new Values(12, null), new Values(13, null), new Values(14, null)},
+            getCollectedValues().toArray());
+  }
+
+  @Test
   public void testInsert() throws Exception {
     final int EXPECTED_VALUE_SIZE = 1;
     String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 85ddc91..5d0384a 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -308,6 +308,157 @@ public class TestUtils {
     }
   }
 
+  public static class MockSqlTridentJoinDataSourceEmp implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSpout();
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new CollectDataFunction();
+    }
+
+    public static class CollectDataFunction extends BaseFunction {
+      /**
+       * Collect all values in a static variable as the instance will go through serialization and deserialization.
+       */
+      private transient static final List<List<Object> > VALUES = new ArrayList<>();
+      public static List<List<Object>> getCollectedValues() {
+        return VALUES;
+      }
+
+      @Override
+      public void execute(TridentTuple tuple, TridentCollector collector) {
+        VALUES.add(tuple.getValues());
+      }
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("EMPID", "EMPNAME", "DEPTID");
+
+      public MockSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i, "emp-" + i, i % 2));
+        }
+        for (int i = 10; i < 15; ++i) {
+          RECORDS.add(new Values(i, "emp-" + i, i));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
+  public static class MockSqlTridentJoinDataSourceDept implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSpout();
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new CollectDataFunction();
+    }
+
+    public static class CollectDataFunction extends BaseFunction {
+      /**
+       * Collect all values in a static variable as the instance will go through serialization and deserialization.
+       */
+      private transient static final List<List<Object> > VALUES = new ArrayList<>();
+      public static List<List<Object>> getCollectedValues() {
+        return VALUES;
+      }
+
+      @Override
+      public void execute(TridentTuple tuple, TridentCollector collector) {
+        VALUES.add(tuple.getValues());
+      }
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("DEPTID", "DEPTNAME");
+
+      public MockSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i, "dept-" + i));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
   public static class CollectDataChannelHandler implements ChannelHandler {
     private final List<Values> values;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java b/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
new file mode 100644
index 0000000..f563390
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident;
+
+/**
+ * This enum defines how the output fields of JOIN is constructed.
+ *
+ * If user specifies COMPACT while calling JOIN, the tuples emitted from the join will contain:
+ * First, the list of join fields. Please note that joining fields are exposed only once from emitted tuples.
+ * Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method.
+ *
+ * If user specifies PRESERVE while calling JOIN, the tuples emitted from the join will contain:
+ * a list of all fields from all streams, in order of how the streams were passed to the join method.
+ */
+public enum JoinOutFieldsMode {
+    COMPACT,
+    PRESERVE
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e87b1d1..befdc85 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -27,6 +27,7 @@ import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.trident.operation.impl.PreservingFieldsOrderJoinerMultiReducer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
@@ -46,7 +47,6 @@ import org.apache.storm.trident.graph.GraphGrouper;
 import org.apache.storm.trident.graph.Group;
 import org.apache.storm.trident.operation.DefaultResourceDeclarer;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
-import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.operation.MultiReducer;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
 import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -262,19 +262,57 @@ public class TridentTopology {
     }
     
     public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
-        return join(streams, joinFields, outFields, repeat(streams.size(), type));        
+        return join(streams, joinFields, outFields, repeat(streams.size(), type));
     }
 
     public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
-        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);        
+        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
         
     }
     
     public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
-        return multiReduce(strippedInputFields(streams, joinFields),
-              groupedStreams(streams, joinFields),
-              new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
-              outFields);
+        return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
+    }
+
+    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
+        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
+    }
+
+    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
+        return join(streams, joinFields, outFields, JoinType.INNER, mode);
+    }
+
+    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
+        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
+    }
+
+    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
+        return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
+    }
+
+    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
+        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);
+
+    }
+
+    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
+        switch (mode) {
+            case COMPACT:
+                return multiReduce(strippedInputFields(streams, joinFields),
+                        groupedStreams(streams, joinFields),
+                        new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
+                        outFields);
+
+            case PRESERVE:
+                return multiReduce(strippedInputFields(streams, joinFields),
+                        groupedStreams(streams, joinFields),
+                        new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
+                                getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
+                        outFields);
+
+            default:
+                throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
+        }
     }
 
     public TridentTopology setResourceDefaults(DefaultResourceDeclarer defaults) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
new file mode 100644
index 0000000..f1a7153
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
@@ -0,0 +1,22 @@
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JoinState {
+    List<List>[] sides;
+    int numSidesReceived = 0;
+    int[] indices;
+    TridentTuple group;
+
+    public JoinState(int numSides, TridentTuple group) {
+        sides = new List[numSides];
+        indices = new int[numSides];
+        this.group = group;
+        for(int i=0; i<numSides; i++) {
+            sides[i] = new ArrayList<List>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
index 9c6da6e..3ed2e12 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.JoinType;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.TridentMultiReducerContext;
-import org.apache.storm.trident.operation.impl.JoinerMultiReducer.JoinState;
 import org.apache.storm.trident.tuple.ComboList;
 import org.apache.storm.trident.tuple.TridentTuple;
 
@@ -47,7 +46,7 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
     public void prepare(Map conf, TridentMultiReducerContext context) {
         int[] sizes = new int[_sideFields.size() + 1];
         sizes[0] = _numGroupFields;
-        for(int i=0; i<_sideFields.size(); i++) {
+        for (int i = 0; i < _sideFields.size(); i++) {
             sizes[i+1] = _sideFields.get(i).size();
         }
         _factory = new ComboList.Factory(sizes);
@@ -77,13 +76,13 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
     public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
         List<List>[] sides = state.sides;
         boolean wasEmpty = state.numSidesReceived < sides.length;
-        for(int i=0; i<sides.length; i++) {
-            if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
+        for (int i = 0; i < sides.length; i++) {
+            if (sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
                 state.numSidesReceived++;
                 sides[i].add(makeNullList(_sideFields.get(i).size()));
             }
         }
-        if(wasEmpty && state.numSidesReceived == sides.length) {
+        if (wasEmpty && state.numSidesReceived == sides.length) {
             emitCrossJoin(state, collector, -1, null);
         }
     }
@@ -94,7 +93,7 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
     
     private List<Object> makeNullList(int size) {
         List<Object> ret = new ArrayList(size);
-        for(int i=0; i<size; i++) {
+        for (int i = 0; i < size; i++) {
             ret.add(null);
         }
         return ret;
@@ -103,17 +102,17 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
     private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
         List<List>[] sides = state.sides;
         int[] indices = state.indices;
-        for(int i=0; i<indices.length; i++) {
+        for (int i = 0; i < indices.length; i++) {
             indices[i] = 0;
         }
         
         boolean keepGoing = true;
         //emit cross-join of all emitted tuples
-        while(keepGoing) {
+        while (keepGoing) {
             List[] combined = new List[sides.length+1];
             combined[0] = state.group;
-            for(int i=0; i<sides.length; i++) {
-                if(i==overrideIndex) {
+            for (int i = 0; i < sides.length; i++) {
+                if (i == overrideIndex) {
                     combined[i+1] = overrideTuple;
                 } else {
                     combined[i+1] = sides[i].get(indices[i]);                
@@ -128,32 +127,16 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
     //return false if can't increment anymore
     //TODO: DRY this code up with what's in ChainedAggregatorImpl
     private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
-        if(j==-1) return false;
-        if(j==overrideIndex) {
+        if (j == -1) return false;
+        if (j == overrideIndex) {
             return increment(lengths, indices, j-1, overrideIndex);
         }
         indices[j]++;
-        if(indices[j] >= lengths[j].size()) {
+        if (indices[j] >= lengths[j].size()) {
             indices[j] = 0;
             return increment(lengths, indices, j-1, overrideIndex);
         }
         return true;
     }
-    
-    public static class JoinState {
-        List<List>[] sides;
-        int numSidesReceived = 0;
-        int[] indices;
-        TridentTuple group;
-        
-        public JoinState(int numSides, TridentTuple group) {
-            sides = new List[numSides];
-            indices = new int[numSides];
-            this.group = group;
-            for(int i=0; i<numSides; i++) {
-                sides[i] = new ArrayList<List>();
-            }            
-        }
-    }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
new file mode 100644
index 0000000..07ce0d8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.JoinType;
+import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentMultiReducerContext;
+import org.apache.storm.trident.tuple.ComboList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PreservingFieldsOrderJoinerMultiReducer implements GroupedMultiReducer<JoinState> {
+    List<JoinType> _types;
+    List<Fields> _sideFields;
+    List<Fields> _joiningFields;
+    List<Fields> _originFields;
+    int _numGroupFields;
+    ComboList.Factory _factory;
+
+
+    public PreservingFieldsOrderJoinerMultiReducer(List<JoinType> types, int numGroupFields, List<Fields> origins,
+                                                   List<Fields> joins, List<Fields> sides) {
+        _types = types;
+        _originFields = origins;
+        _joiningFields = joins;
+        _sideFields = sides;
+
+        // we already checked this
+        _numGroupFields = numGroupFields;
+    }
+
+    @Override
+    public void prepare(Map conf, TridentMultiReducerContext context) {
+        int[] sizes = new int[_originFields.size()];
+        for (int i = 0; i < _originFields.size(); i++) {
+            sizes[i] = _originFields.get(i).size();
+        }
+        _factory = new ComboList.Factory(sizes);
+    }
+
+    @Override
+    public JoinState init(TridentCollector collector, TridentTuple group) {
+        return new JoinState(_types.size(), group);
+    }
+
+    @Override
+    public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
+        //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
+        //TODO: only do cross join if at least one tuple in each side
+        List<List> side = state.sides[streamIndex];
+        if (side.isEmpty()) {
+            state.numSidesReceived++;
+        }
+
+        side.add(input);
+        if (state.numSidesReceived == state.sides.length) {
+            emitCrossJoin(state, collector, streamIndex, input);
+        }
+    }
+
+    @Override
+    public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
+        List<List>[] sides = state.sides;
+        boolean wasEmpty = state.numSidesReceived < sides.length;
+        for (int i = 0; i < sides.length; i++) {
+            if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
+                state.numSidesReceived++;
+                sides[i].add(null);
+            }
+        }
+        if (wasEmpty && state.numSidesReceived == sides.length) {
+            emitCrossJoin(state, collector, -1, null);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    private List<Object> makeNullList(int size) {
+        List<Object> ret = new ArrayList(size);
+        for (int i = 0; i < size; i++) {
+            ret.add(null);
+        }
+        return ret;
+    }
+
+    private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
+        List<List>[] sides = state.sides;
+        int[] indices = state.indices;
+        for (int i = 0; i < indices.length; i++) {
+            indices[i] = 0;
+        }
+        
+        boolean keepGoing = true;
+        //emit cross-join of all emitted tuples
+        while (keepGoing) {
+            List[] combined = new List[sides.length];
+
+            for (int i = 0; i < sides.length; i++) {
+                List<Object> values = buildValuesForStream(state, overrideIndex, overrideTuple, sides, indices, combined, i);
+                combined[i] = values;
+            }
+            collector.emit(_factory.create(combined));
+            keepGoing = increment(sides, indices, indices.length - 1, overrideIndex);
+        }
+    }
+
+    private List<Object> buildValuesForStream(JoinState state, int overrideIndex, TridentTuple overrideTuple, List<List>[] sides,
+                                              int[] indices, List[] combined, int streamIdx) {
+        List sideValues;
+        if (streamIdx == overrideIndex) {
+            sideValues = overrideTuple;
+        } else {
+            sideValues = sides[streamIdx].get(indices[streamIdx]);
+        }
+
+        Fields originFields = _originFields.get(streamIdx);
+        if (sideValues == null) {
+            return makeNullList(originFields.size());
+        } else {
+            List<Object> ret = new ArrayList<>(originFields.size());
+            Fields sideFields = _sideFields.get(streamIdx);
+            Fields joinFields = _joiningFields.get(streamIdx);
+            int sideIdx = 0;
+            for (String field : originFields) {
+                // assuming _sideFields are preserving its order
+                if (sideFields.contains(field)) {
+                    ret.add(sideValues.get(sideIdx++));
+                } else {
+                    // group field
+                    ret.add(state.group.get(joinFields.fieldIndex(field)));
+                }
+            }
+            return ret;
+        }
+    }
+
+
+    //return false if can't increment anymore
+    //TODO: DRY this code up with what's in ChainedAggregatorImpl
+    private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
+        if (j == -1) return false;
+        if (j == overrideIndex) {
+            return increment(lengths, indices, j-1, overrideIndex);
+        }
+        indices[j]++;
+        if (indices[j] >= lengths[j].size()) {
+            indices[j] = 0;
+            return increment(lengths, indices, j-1, overrideIndex);
+        }
+        return true;
+    }
+
+}


[3/3] storm git commit: add STORM-2057 to CHANGELOG

Posted by ka...@apache.org.
add STORM-2057 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f1e3120c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f1e3120c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f1e3120c

Branch: refs/heads/1.x-branch
Commit: f1e3120cd5ceae9598b6599c23b63333c497f483
Parents: 682eb64
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 13 13:46:28 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:46:28 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f1e3120c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 368a7fe..e47962d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2057: Support JOIN statement in Storm SQL
  * STORM-1970: external project examples refator
  * STORM-2074: fix storm-kafka-monitor NPE bug
  * STORM-1459: Allow not specifying producer properties in read-only Kafka table in StormSQL


[2/3] storm git commit: Merge branch 'STORM-2057-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2057-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/682eb644
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/682eb644
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/682eb644

Branch: refs/heads/1.x-branch
Commit: 682eb64490e07b571c904f65896b124cee86f8cc
Parents: 09bff55 bb7c8c7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 13 13:45:58 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:45:58 2016 +0900

----------------------------------------------------------------------
 docs/storm-sql.md                               |  16 +-
 external/sql/README.md                          |  12 +-
 .../trident/TridentLogicalPlanCompiler.java     |  61 ++++++-
 .../storm/sql/compiler/TestCompilerUtils.java   |  44 +++++
 .../backends/trident/TestPlanCompiler.java      |  74 ++++++++
 .../test/org/apache/storm/sql/TestUtils.java    | 151 ++++++++++++++++
 .../apache/storm/trident/JoinOutFieldsMode.java |  33 ++++
 .../apache/storm/trident/TridentTopology.java   |  52 +++++-
 .../storm/trident/operation/impl/JoinState.java |  22 +++
 .../operation/impl/JoinerMultiReducer.java      |  43 ++---
 ...PreservingFieldsOrderJoinerMultiReducer.java | 175 +++++++++++++++++++
 11 files changed, 637 insertions(+), 46 deletions(-)
----------------------------------------------------------------------