You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/13 04:46:43 UTC
[1/3] storm git commit: STORM-2057 Support JOIN statement in Storm SQL
Repository: storm
Updated Branches:
refs/heads/1.x-branch 09bff55e0 -> f1e3120cd
STORM-2057 Support JOIN statement in Storm SQL
* Support INNER, LEFT / RIGHT / FULL OUTER JOIN based on Trident
* Limitation of feature is just same to Trident join
* join is occurred for each batch (not across batches)
* Introduce new parameter on join(): JoinOutFieldsMode
* default value of mode is COMPACT, which is as same as current
* created PreservingFieldsOrderJoinerMultiReducer
* this preserves fields order from all streams fields
* modified codes to reflect the changes
* Reflect changes to doc
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb7c8c75
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb7c8c75
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb7c8c75
Branch: refs/heads/1.x-branch
Commit: bb7c8c75484253a5d79ae04f1380eda183c2eeb9
Parents: 09bff55
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Aug 30 09:07:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:35:06 2016 +0900
----------------------------------------------------------------------
docs/storm-sql.md | 16 +-
external/sql/README.md | 12 +-
.../trident/TridentLogicalPlanCompiler.java | 61 ++++++-
.../storm/sql/compiler/TestCompilerUtils.java | 44 +++++
.../backends/trident/TestPlanCompiler.java | 74 ++++++++
.../test/org/apache/storm/sql/TestUtils.java | 151 ++++++++++++++++
.../apache/storm/trident/JoinOutFieldsMode.java | 33 ++++
.../apache/storm/trident/TridentTopology.java | 52 +++++-
.../storm/trident/operation/impl/JoinState.java | 22 +++
.../operation/impl/JoinerMultiReducer.java | 43 ++---
...PreservingFieldsOrderJoinerMultiReducer.java | 175 +++++++++++++++++++
11 files changed, 637 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 104b852..3eaf678 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -28,6 +28,7 @@ The following features are supported in the current repository:
* Projections
* Aggregations (Grouping)
* User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
## Specifying External Data Sources
@@ -94,7 +95,7 @@ and class for aggregate function is here:
}
```
-If users doesn't define `result` method, result is the last return value of `add` method.
+If users don't define `result` method, result is the last return value of `add` method.
Users need to define `result` method only when we need to transform accumulated value.
## Example: Filtering Kafka Stream
@@ -128,6 +129,13 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
## Current Limitations
-Aggregation, windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. All processors have a parallelism hint of 1.
-
-The current implementation of the Kafka connector in StormSQL assumes both the input and the output are in JSON formats. The connector has not yet recognized the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+ - Not across batches.
+ - Limitation came from `join` feature of Trident.
+ - Please refer this doc: `Trident API Overview` for details.
+- Specifying parallelism hints in the topology is not yet supported.
+ - All processors have a parallelism hint of 1.
+- The current implementation of the Kafka connector in StormSQL assumes both the input and the output are in JSON formats.
+ - The connector has not yet recognized the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index ae322f2..d0c0d06 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -21,6 +21,7 @@ The following features are supported in the current repository:
* Projections
* Aggregations (Grouping)
* User defined function (scalar and aggregate)
+* Join (Inner, Left outer, Right outer, Full outer)
## Specifying External Data Sources
@@ -80,7 +81,7 @@ and class for aggregate function is here:
}
```
-If users doesn't define `result` method, result is the last return value of `add` method.
+If users don't define `result` method, result is the last return value of `add` method.
Users need to define `result` method only when we need to transform accumulated value.
## Example: Filtering Kafka Stream
@@ -124,7 +125,14 @@ By now you should be able to see the `order_filtering` topology in the Storm UI.
## Current Limitations
-Windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. All processors have a parallelism hint of 1.
+- Windowing is yet to be implemented.
+- Only equi-join (single field equality) is supported for joining table.
+- Joining table only applies within each small batch that comes off of the spout.
+ - Not across batches.
+ - Limitation came from `join` feature of Trident.
+ - Please refer this doc: `Trident API Overview` for details.
+- Specifying parallelism hints in the topology is not yet supported.
+ - All processors have a parallelism hint of 1.
## License
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
index b10e64e..688b164 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java
@@ -22,13 +22,16 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Primitives;
import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.schema.impl.AggregateFunctionImpl;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
@@ -44,6 +47,8 @@ import org.apache.storm.sql.runtime.trident.operations.DivideForAverage;
import org.apache.storm.sql.runtime.trident.operations.MaxBy;
import org.apache.storm.sql.runtime.trident.operations.MinBy;
import org.apache.storm.sql.runtime.trident.operations.SumBy;
+import org.apache.storm.trident.JoinOutFieldsMode;
+import org.apache.storm.trident.JoinType;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
@@ -131,8 +136,58 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
}
@Override
+ public IAggregatableStream visitJoin(Join join, List<IAggregatableStream> inputStreams) throws Exception {
+ if (inputStreams.size() != 2) {
+ throw new RuntimeException("Join is a BiRel");
+ }
+
+ int[] ordinals = new int[2];
+ if (!RelOptUtil.analyzeSimpleEquiJoin((LogicalJoin) join, ordinals)) {
+ throw new UnsupportedOperationException("Only simple equi joins are supported");
+ }
+
+ List<JoinType> joinTypes = new ArrayList<>();
+ switch (join.getJoinType()) {
+ case INNER:
+ joinTypes.add(JoinType.INNER);
+ joinTypes.add(JoinType.INNER);
+ break;
+
+ case LEFT:
+ joinTypes.add(JoinType.INNER);
+ joinTypes.add(JoinType.OUTER);
+ break;
+
+ case RIGHT:
+ joinTypes.add(JoinType.OUTER);
+ joinTypes.add(JoinType.INNER);
+ break;
+
+ case FULL:
+ joinTypes.add(JoinType.OUTER);
+ joinTypes.add(JoinType.OUTER);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported join type: " + join.getJoinType());
+ }
+
+ String leftJoinFieldName = join.getLeft().getRowType().getFieldNames().get(ordinals[0]);
+ String rightJoinFieldName = join.getRight().getRowType().getFieldNames().get(ordinals[1]);
+
+ Stream leftInputStream = inputStreams.get(0).toStream();
+ Stream rightInputStream = inputStreams.get(1).toStream();
+ String stageName = getStageName(join);
+
+ return topology
+ .join(leftInputStream, new Fields(leftJoinFieldName), rightInputStream, new Fields(rightJoinFieldName),
+ new Fields(join.getRowType().getFieldNames()), joinTypes, JoinOutFieldsMode.PRESERVE)
+ .name(stageName);
+ }
+
+ @Override
public IAggregatableStream visitProject(Project project, List<IAggregatableStream> inputStreams) throws Exception {
- if (inputStreams.size() > 1) {
+ if (inputStreams.size() != 1) {
throw new RuntimeException("Project is a SingleRel");
}
@@ -171,7 +226,7 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
@Override
public IAggregatableStream visitFilter(Filter filter, List<IAggregatableStream> inputStreams) throws Exception {
- if (inputStreams.size() > 1) {
+ if (inputStreams.size() != 1) {
throw new RuntimeException("Filter is a SingleRel");
}
@@ -190,7 +245,7 @@ public class TridentLogicalPlanCompiler extends PostOrderRelNodeVisitor<IAggrega
@Override
public IAggregatableStream visitAggregate(Aggregate aggregate, List<IAggregatableStream> inputStreams) throws Exception {
- if (inputStreams.size() > 1) {
+ if (inputStreams.size() != 1) {
throw new RuntimeException("Aggregate is a SingleRel");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index 2243119..c3b9ad3 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -20,6 +20,7 @@ package org.apache.storm.sql.compiler;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -28,6 +29,7 @@ import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AggregateFunctionImpl;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -39,6 +41,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.parser.ColumnConstraint;
import java.util.ArrayList;
import java.util.Collections;
@@ -101,6 +104,7 @@ public class TestCompilerUtils {
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
return new CalciteState(schema, tree);
}
@@ -134,6 +138,7 @@ public class TestCompilerUtils {
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
return new CalciteState(schema, tree);
}
@@ -165,6 +170,45 @@ public class TestCompilerUtils {
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("EMPID", SqlTypeName.INTEGER)
+ .field("EMPNAME", SqlTypeName.VARCHAR)
+ .field("DEPTID", SqlTypeName.INTEGER)
+ .build();
+ Table table = streamableTable.stream();
+
+ StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("DEPTID", SqlTypeName.INTEGER)
+ .field("DEPTNAME", SqlTypeName.VARCHAR)
+ .build();
+ Table table2 = streamableTable2.stream();
+
+ schema.add("EMP", table);
+ schema.add("DEPT", table2);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ false,
+ Collections.<String>emptyList(), typeFactory));
+ SqlOperatorTable chainedSqlOperatorTable = new ChainedSqlOperatorTable(sqlOperatorTables);
+ FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+ schema).operatorTable(chainedSqlOperatorTable).build();
+ Planner planner = Frameworks.getPlanner(config);
+ SqlNode parse = planner.parse(sql);
+ SqlNode validate = planner.validate(parse);
+ RelNode tree = planner.convert(validate);
+ System.out.println(RelOptUtil.toString(tree, SqlExplainLevel.ALL_ATTRIBUTES));
return new CalciteState(schema, tree);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 68053da..0ad0a46 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -108,6 +108,80 @@ public class TestPlanCompiler {
}
@Test
+ public void testCompileEquiJoinAndGroupBy() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 2;
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+ data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+ String sql = "SELECT d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID > 0 GROUP BY d.DEPTID";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+ PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+ final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+ final TridentTopology topo = proc.build(data);
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Assert.assertArrayEquals(new Values[] { new Values(1, 2L), new Values(0, 2L)}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testCompileEquiJoinWithLeftOuterJoin() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 3;
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+ data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+ String sql = "SELECT d.DEPTID, e.DEPTID FROM DEPT AS d LEFT OUTER JOIN EMP AS e ON d.DEPTID = e.DEPTID WHERE e.EMPID is null";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+ PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+ final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+ final TridentTopology topo = proc.build(data);
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Assert.assertArrayEquals(new Values[] { new Values(2, null), new Values(3, null), new Values(4, null)}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testCompileEquiJoinWithRightOuterJoin() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 3;
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+ data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+ String sql = "SELECT d.DEPTID, e.DEPTID FROM EMP AS e RIGHT OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.EMPID is null";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+ PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+ final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+ final TridentTopology topo = proc.build(data);
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Assert.assertArrayEquals(new Values[] { new Values(2, null), new Values(3, null), new Values(4, null)}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testCompileEquiJoinWithFullOuterJoin() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 8;
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("EMP", new TestUtils.MockSqlTridentJoinDataSourceEmp());
+ data.put("DEPT", new TestUtils.MockSqlTridentJoinDataSourceDept());
+ String sql = "SELECT e.DEPTID, d.DEPTNAME FROM EMP AS e FULL OUTER JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE (d.DEPTNAME is null OR e.EMPNAME is null)";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverSimpleEquiJoinTables(sql);
+ PlanCompiler compiler = new PlanCompiler(data, typeFactory);
+ final AbstractTridentProcessor proc = compiler.compileForTest(state.tree());
+ final TridentTopology topo = proc.build(data);
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+
+ Assert.assertArrayEquals(new Values[] { new Values(null, "dept-2"), new Values(null, "dept-3"), new Values(null, "dept-4"),
+ new Values(10, null), new Values(11, null), new Values(12, null), new Values(13, null), new Values(14, null)},
+ getCollectedValues().toArray());
+ }
+
+ @Test
public void testInsert() throws Exception {
final int EXPECTED_VALUE_SIZE = 1;
String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 85ddc91..5d0384a 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -308,6 +308,157 @@ public class TestUtils {
}
}
+ public static class MockSqlTridentJoinDataSourceEmp implements ISqlTridentDataSource {
+ @Override
+ public IBatchSpout getProducer() {
+ return new MockSpout();
+ }
+
+ @Override
+ public Function getConsumer() {
+ return new CollectDataFunction();
+ }
+
+ public static class CollectDataFunction extends BaseFunction {
+ /**
+ * Collect all values in a static variable as the instance will go through serialization and deserialization.
+ */
+ private transient static final List<List<Object> > VALUES = new ArrayList<>();
+ public static List<List<Object>> getCollectedValues() {
+ return VALUES;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ VALUES.add(tuple.getValues());
+ }
+ }
+
+ private static class MockSpout implements IBatchSpout {
+ private final ArrayList<Values> RECORDS = new ArrayList<>();
+ private final Fields OUTPUT_FIELDS = new Fields("EMPID", "EMPNAME", "DEPTID");
+
+ public MockSpout() {
+ for (int i = 0; i < 5; ++i) {
+ RECORDS.add(new Values(i, "emp-" + i, i % 2));
+ }
+ for (int i = 10; i < 15; ++i) {
+ RECORDS.add(new Values(i, "emp-" + i, i));
+ }
+ }
+
+ private boolean emitted = false;
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ if (emitted) {
+ return;
+ }
+
+ for (Values r : RECORDS) {
+ collector.emit(r);
+ }
+ emitted = true;
+ }
+
+ @Override
+ public void ack(long batchId) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return OUTPUT_FIELDS;
+ }
+ }
+ }
+
+ public static class MockSqlTridentJoinDataSourceDept implements ISqlTridentDataSource {
+ @Override
+ public IBatchSpout getProducer() {
+ return new MockSpout();
+ }
+
+ @Override
+ public Function getConsumer() {
+ return new CollectDataFunction();
+ }
+
+ public static class CollectDataFunction extends BaseFunction {
+ /**
+ * Collect all values in a static variable as the instance will go through serialization and deserialization.
+ */
+ private transient static final List<List<Object> > VALUES = new ArrayList<>();
+ public static List<List<Object>> getCollectedValues() {
+ return VALUES;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ VALUES.add(tuple.getValues());
+ }
+ }
+
+ private static class MockSpout implements IBatchSpout {
+ private final ArrayList<Values> RECORDS = new ArrayList<>();
+ private final Fields OUTPUT_FIELDS = new Fields("DEPTID", "DEPTNAME");
+
+ public MockSpout() {
+ for (int i = 0; i < 5; ++i) {
+ RECORDS.add(new Values(i, "dept-" + i));
+ }
+ }
+
+ private boolean emitted = false;
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ if (emitted) {
+ return;
+ }
+
+ for (Values r : RECORDS) {
+ collector.emit(r);
+ }
+ emitted = true;
+ }
+
+ @Override
+ public void ack(long batchId) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return OUTPUT_FIELDS;
+ }
+ }
+ }
+
public static class CollectDataChannelHandler implements ChannelHandler {
private final List<Values> values;
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java b/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
new file mode 100644
index 0000000..f563390
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/JoinOutFieldsMode.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident;
+
+/**
+ * This enum defines how the output fields of JOIN is constructed.
+ *
+ * If user specifies COMPACT while calling JOIN, the tuples emitted from the join will contain:
+ * First, the list of join fields. Please note that joining fields are exposed only once from emitted tuples.
+ * Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method.
+ *
+ * If user specifies PRESERVE while calling JOIN, the tuples emitted from the join will contain:
+ * a list of all fields from all streams, in order of how the streams were passed to the join method.
+ */
+public enum JoinOutFieldsMode {
+ COMPACT,
+ PRESERVE
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e87b1d1..befdc85 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -27,6 +27,7 @@ import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.trident.operation.impl.PreservingFieldsOrderJoinerMultiReducer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
@@ -46,7 +47,6 @@ import org.apache.storm.trident.graph.GraphGrouper;
import org.apache.storm.trident.graph.Group;
import org.apache.storm.trident.operation.DefaultResourceDeclarer;
import org.apache.storm.trident.operation.GroupedMultiReducer;
-import org.apache.storm.trident.operation.ITridentResource;
import org.apache.storm.trident.operation.MultiReducer;
import org.apache.storm.trident.operation.impl.FilterExecutor;
import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -262,19 +262,57 @@ public class TridentTopology {
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
- return join(streams, joinFields, outFields, repeat(streams.size(), type));
+ return join(streams, joinFields, outFields, repeat(streams.size(), type));
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
- return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
+ return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
- return multiReduce(strippedInputFields(streams, joinFields),
- groupedStreams(streams, joinFields),
- new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
- outFields);
+ return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
+ }
+
+ public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
+ return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
+ }
+
+ public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
+ return join(streams, joinFields, outFields, JoinType.INNER, mode);
+ }
+
+ public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
+ return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
+ }
+
+ public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
+ return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
+ }
+
+ public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
+ return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);
+
+ }
+
+ public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
+ switch (mode) {
+ case COMPACT:
+ return multiReduce(strippedInputFields(streams, joinFields),
+ groupedStreams(streams, joinFields),
+ new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
+ outFields);
+
+ case PRESERVE:
+ return multiReduce(strippedInputFields(streams, joinFields),
+ groupedStreams(streams, joinFields),
+ new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
+ getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
+ outFields);
+
+ default:
+ throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
+ }
}
public TridentTopology setResourceDefaults(DefaultResourceDeclarer defaults) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
new file mode 100644
index 0000000..f1a7153
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinState.java
@@ -0,0 +1,22 @@
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JoinState {
+ List<List>[] sides;
+ int numSidesReceived = 0;
+ int[] indices;
+ TridentTuple group;
+
+ public JoinState(int numSides, TridentTuple group) {
+ sides = new List[numSides];
+ indices = new int[numSides];
+ this.group = group;
+ for(int i=0; i<numSides; i++) {
+ sides[i] = new ArrayList<List>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
index 9c6da6e..3ed2e12 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/JoinerMultiReducer.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.JoinType;
import org.apache.storm.trident.operation.GroupedMultiReducer;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentMultiReducerContext;
-import org.apache.storm.trident.operation.impl.JoinerMultiReducer.JoinState;
import org.apache.storm.trident.tuple.ComboList;
import org.apache.storm.trident.tuple.TridentTuple;
@@ -47,7 +46,7 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
public void prepare(Map conf, TridentMultiReducerContext context) {
int[] sizes = new int[_sideFields.size() + 1];
sizes[0] = _numGroupFields;
- for(int i=0; i<_sideFields.size(); i++) {
+ for (int i = 0; i < _sideFields.size(); i++) {
sizes[i+1] = _sideFields.get(i).size();
}
_factory = new ComboList.Factory(sizes);
@@ -77,13 +76,13 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
List<List>[] sides = state.sides;
boolean wasEmpty = state.numSidesReceived < sides.length;
- for(int i=0; i<sides.length; i++) {
- if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
+ for (int i = 0; i < sides.length; i++) {
+ if (sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
state.numSidesReceived++;
sides[i].add(makeNullList(_sideFields.get(i).size()));
}
}
- if(wasEmpty && state.numSidesReceived == sides.length) {
+ if (wasEmpty && state.numSidesReceived == sides.length) {
emitCrossJoin(state, collector, -1, null);
}
}
@@ -94,7 +93,7 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
private List<Object> makeNullList(int size) {
List<Object> ret = new ArrayList(size);
- for(int i=0; i<size; i++) {
+ for (int i = 0; i < size; i++) {
ret.add(null);
}
return ret;
@@ -103,17 +102,17 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
List<List>[] sides = state.sides;
int[] indices = state.indices;
- for(int i=0; i<indices.length; i++) {
+ for (int i = 0; i < indices.length; i++) {
indices[i] = 0;
}
boolean keepGoing = true;
//emit cross-join of all emitted tuples
- while(keepGoing) {
+ while (keepGoing) {
List[] combined = new List[sides.length+1];
combined[0] = state.group;
- for(int i=0; i<sides.length; i++) {
- if(i==overrideIndex) {
+ for (int i = 0; i < sides.length; i++) {
+ if (i == overrideIndex) {
combined[i+1] = overrideTuple;
} else {
combined[i+1] = sides[i].get(indices[i]);
@@ -128,32 +127,16 @@ public class JoinerMultiReducer implements GroupedMultiReducer<JoinState> {
//return false if can't increment anymore
//TODO: DRY this code up with what's in ChainedAggregatorImpl
private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
- if(j==-1) return false;
- if(j==overrideIndex) {
+ if (j == -1) return false;
+ if (j == overrideIndex) {
return increment(lengths, indices, j-1, overrideIndex);
}
indices[j]++;
- if(indices[j] >= lengths[j].size()) {
+ if (indices[j] >= lengths[j].size()) {
indices[j] = 0;
return increment(lengths, indices, j-1, overrideIndex);
}
return true;
}
-
- public static class JoinState {
- List<List>[] sides;
- int numSidesReceived = 0;
- int[] indices;
- TridentTuple group;
-
- public JoinState(int numSides, TridentTuple group) {
- sides = new List[numSides];
- indices = new int[numSides];
- this.group = group;
- for(int i=0; i<numSides; i++) {
- sides[i] = new ArrayList<List>();
- }
- }
- }
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bb7c8c75/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
new file mode 100644
index 0000000..07ce0d8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/PreservingFieldsOrderJoinerMultiReducer.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.JoinType;
+import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentMultiReducerContext;
+import org.apache.storm.trident.tuple.ComboList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PreservingFieldsOrderJoinerMultiReducer implements GroupedMultiReducer<JoinState> {
+ List<JoinType> _types;
+ List<Fields> _sideFields;
+ List<Fields> _joiningFields;
+ List<Fields> _originFields;
+ int _numGroupFields;
+ ComboList.Factory _factory;
+
+
+ public PreservingFieldsOrderJoinerMultiReducer(List<JoinType> types, int numGroupFields, List<Fields> origins,
+ List<Fields> joins, List<Fields> sides) {
+ _types = types;
+ _originFields = origins;
+ _joiningFields = joins;
+ _sideFields = sides;
+
+ // we already checked this
+ _numGroupFields = numGroupFields;
+ }
+
+ @Override
+ public void prepare(Map conf, TridentMultiReducerContext context) {
+ int[] sizes = new int[_originFields.size()];
+ for (int i = 0; i < _originFields.size(); i++) {
+ sizes[i] = _originFields.get(i).size();
+ }
+ _factory = new ComboList.Factory(sizes);
+ }
+
+ @Override
+ public JoinState init(TridentCollector collector, TridentTuple group) {
+ return new JoinState(_types.size(), group);
+ }
+
+ @Override
+ public void execute(JoinState state, int streamIndex, TridentTuple group, TridentTuple input, TridentCollector collector) {
+ //TODO: do the inner join incrementally, emitting the cross join with this tuple, against all other sides
+ //TODO: only do cross join if at least one tuple in each side
+ List<List> side = state.sides[streamIndex];
+ if (side.isEmpty()) {
+ state.numSidesReceived++;
+ }
+
+ side.add(input);
+ if (state.numSidesReceived == state.sides.length) {
+ emitCrossJoin(state, collector, streamIndex, input);
+ }
+ }
+
+ @Override
+ public void complete(JoinState state, TridentTuple group, TridentCollector collector) {
+ List<List>[] sides = state.sides;
+ boolean wasEmpty = state.numSidesReceived < sides.length;
+ for (int i = 0; i < sides.length; i++) {
+ if(sides[i].isEmpty() && _types.get(i) == JoinType.OUTER) {
+ state.numSidesReceived++;
+ sides[i].add(null);
+ }
+ }
+ if (wasEmpty && state.numSidesReceived == sides.length) {
+ emitCrossJoin(state, collector, -1, null);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ private List<Object> makeNullList(int size) {
+ List<Object> ret = new ArrayList(size);
+ for (int i = 0; i < size; i++) {
+ ret.add(null);
+ }
+ return ret;
+ }
+
+ private void emitCrossJoin(JoinState state, TridentCollector collector, int overrideIndex, TridentTuple overrideTuple) {
+ List<List>[] sides = state.sides;
+ int[] indices = state.indices;
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = 0;
+ }
+
+ boolean keepGoing = true;
+ //emit cross-join of all emitted tuples
+ while (keepGoing) {
+ List[] combined = new List[sides.length];
+
+ for (int i = 0; i < sides.length; i++) {
+ List<Object> values = buildValuesForStream(state, overrideIndex, overrideTuple, sides, indices, combined, i);
+ combined[i] = values;
+ }
+ collector.emit(_factory.create(combined));
+ keepGoing = increment(sides, indices, indices.length - 1, overrideIndex);
+ }
+ }
+
+ private List<Object> buildValuesForStream(JoinState state, int overrideIndex, TridentTuple overrideTuple, List<List>[] sides,
+ int[] indices, List[] combined, int streamIdx) {
+ List sideValues;
+ if (streamIdx == overrideIndex) {
+ sideValues = overrideTuple;
+ } else {
+ sideValues = sides[streamIdx].get(indices[streamIdx]);
+ }
+
+ Fields originFields = _originFields.get(streamIdx);
+ if (sideValues == null) {
+ return makeNullList(originFields.size());
+ } else {
+ List<Object> ret = new ArrayList<>(originFields.size());
+ Fields sideFields = _sideFields.get(streamIdx);
+ Fields joinFields = _joiningFields.get(streamIdx);
+ int sideIdx = 0;
+ for (String field : originFields) {
+ // assuming _sideFields are preserving its order
+ if (sideFields.contains(field)) {
+ ret.add(sideValues.get(sideIdx++));
+ } else {
+ // group field
+ ret.add(state.group.get(joinFields.fieldIndex(field)));
+ }
+ }
+ return ret;
+ }
+ }
+
+
+ //return false if can't increment anymore
+ //TODO: DRY this code up with what's in ChainedAggregatorImpl
+ private boolean increment(List[] lengths, int[] indices, int j, int overrideIndex) {
+ if (j == -1) return false;
+ if (j == overrideIndex) {
+ return increment(lengths, indices, j-1, overrideIndex);
+ }
+ indices[j]++;
+ if (indices[j] >= lengths[j].size()) {
+ indices[j] = 0;
+ return increment(lengths, indices, j-1, overrideIndex);
+ }
+ return true;
+ }
+
+}
[3/3] storm git commit: add STORM-2057 to CHANGELOG
Posted by ka...@apache.org.
add STORM-2057 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f1e3120c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f1e3120c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f1e3120c
Branch: refs/heads/1.x-branch
Commit: f1e3120cd5ceae9598b6599c23b63333c497f483
Parents: 682eb64
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 13 13:46:28 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:46:28 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f1e3120c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 368a7fe..e47962d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2057: Support JOIN statement in Storm SQL
* STORM-1970: external project examples refator
* STORM-2074: fix storm-kafka-monitor NPE bug
* STORM-1459: Allow not specifying producer properties in read-only Kafka table in StormSQL
[2/3] storm git commit: Merge branch 'STORM-2057-1.x' into 1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2057-1.x' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/682eb644
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/682eb644
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/682eb644
Branch: refs/heads/1.x-branch
Commit: 682eb64490e07b571c904f65896b124cee86f8cc
Parents: 09bff55 bb7c8c7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 13 13:45:58 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 13 13:45:58 2016 +0900
----------------------------------------------------------------------
docs/storm-sql.md | 16 +-
external/sql/README.md | 12 +-
.../trident/TridentLogicalPlanCompiler.java | 61 ++++++-
.../storm/sql/compiler/TestCompilerUtils.java | 44 +++++
.../backends/trident/TestPlanCompiler.java | 74 ++++++++
.../test/org/apache/storm/sql/TestUtils.java | 151 ++++++++++++++++
.../apache/storm/trident/JoinOutFieldsMode.java | 33 ++++
.../apache/storm/trident/TridentTopology.java | 52 +++++-
.../storm/trident/operation/impl/JoinState.java | 22 +++
.../operation/impl/JoinerMultiReducer.java | 43 ++---
...PreservingFieldsOrderJoinerMultiReducer.java | 175 +++++++++++++++++++
11 files changed, 637 insertions(+), 46 deletions(-)
----------------------------------------------------------------------