You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 01:26:52 UTC
[1/3] storm git commit: STORM-1443 Support customizing parallelism in
StormSQL
Repository: storm
Updated Branches:
refs/heads/1.x-branch 02ab70c92 -> a79784010
STORM-1443 Support customizing parallelism in StormSQL
* Add 'PARALLELISM' to table definition
* default value is 1
* Set parallelism to new stream while creating stream with scan
* downstream operators will also have same parallelism unless repartitioned
* not apply parallelism to output table since it can trigger repartition
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d34c2ebf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d34c2ebf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d34c2ebf
Branch: refs/heads/1.x-branch
Commit: d34c2ebf11b92a54fb1b1692ae4a8dae8e0cfb2b
Parents: 02ab70c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Oct 19 18:25:53 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 10:23:18 2017 +0900
----------------------------------------------------------------------
docs/storm-sql.md | 12 +++++--
external/sql/README.md | 7 ++--
.../storm-sql-core/src/codegen/data/Parser.tdd | 1 +
.../src/codegen/includes/parserImpls.ftl | 4 ++-
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 3 ++
.../sql/calcite/ParallelStreamableTable.java | 35 ++++++++++++++++++++
.../apache/storm/sql/compiler/CompilerUtil.java | 14 +++++++-
.../apache/storm/sql/parser/SqlCreateTable.java | 24 +++++++++++---
.../trident/rel/TridentStreamScanRel.java | 8 +++--
.../planner/trident/rules/TridentScanRule.java | 12 +++++--
10 files changed, 105 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 3b7d897..b06f068 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -46,11 +46,19 @@ CREATE EXTERNAL TABLE table_name field_list
OUTPUTFORMAT output_format_classname
]
LOCATION location
+ [ PARALLELISM parallelism ]
[ TBLPROPERTIES tbl_properties ]
[ AS select_stmt ]
```
-You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). For example, the following statement specifies a Kafka spout and sink:
+You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
+
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
+As same as Trident, downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
+
+For example, the following statement specifies a Kafka spout and sink:
```
CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
@@ -159,5 +167,3 @@ LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=
- Windowing is yet to be implemented.
- Aggregation and join are not supported (waiting for `Streaming SQL` to be matured)
-- Specifying parallelism hints in the topology is not yet supported.
- - All processors have a parallelism hint of 1.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index a17f1ff..a4b44fb 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -38,6 +38,11 @@ CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/
The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout.
+Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.)
+
## Plugging in External Data Sources
Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using
@@ -177,8 +182,6 @@ LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=
- Not across batches.
- Limitation came from `join` feature of Trident.
- Please refer this doc: `Trident API Overview` for details.
-- Specifying parallelism hints in the topology is not yet supported.
- - All processors have a parallelism hint of 1.
## License
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index 79a793a..b0dccb6 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -32,6 +32,7 @@
"LOCATION",
"INPUTFORMAT",
"OUTPUTFORMAT",
+ "PARALLELISM",
"STORED",
"TBLPROPERTIES",
"JAR"
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 0013231..4143840 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -61,6 +61,7 @@ SqlNode SqlCreateTable() :
SqlIdentifier tblName;
SqlNodeList fieldList;
SqlNode location;
+ SqlNode parallelism = null;
SqlNode input_format_class_name = null, output_format_class_name = null;
SqlNode tbl_properties = null;
SqlNode select = null;
@@ -77,11 +78,12 @@ SqlNode SqlCreateTable() :
]
<LOCATION>
location = StringLiteral()
+ [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
[ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
[ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
return new SqlCreateTable(pos, tblName, fieldList,
input_format_class_name, output_format_class_name, location,
- tbl_properties, select);
+ parallelism, tbl_properties, select);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index b780239..007daa7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -266,6 +266,9 @@ class StormSqlImpl extends StormSql {
fields.add(new FieldInfo(col.name(), javaType, isPrimary));
}
+ if (n.parallelism() != null) {
+ builder.parallelismHint(n.parallelism());
+ }
Table table = builder.build();
schema.add(n.tableName(), table);
return fields;
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
new file mode 100644
index 0000000..c6b584d
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.schema.StreamableTable;
+
+/**
+ * Table that can be converted to a stream. This table also has its parallelism information.
+ *
+ * @see Delta
+ */
+public interface ParallelStreamableTable extends StreamableTable {
+
+ /**
+ * Returns parallelism hint of this table. Returns null if don't know.
+ */
+ Integer parallelismHint();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1b20aac..2e237c0 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
import org.apache.storm.sql.parser.ColumnConstraint;
import java.util.ArrayList;
@@ -75,6 +76,7 @@ public class CompilerUtil {
private final ArrayList<FieldType> fields = new ArrayList<>();
private final ArrayList<Object[]> rows = new ArrayList<>();
private int primaryKey = -1;
+ private Integer parallelismHint;
private SqlMonotonicity primaryKeyMonotonicity;
private Statistic stats;
@@ -110,6 +112,11 @@ public class CompilerUtil {
return this;
}
+ public TableBuilderInfo parallelismHint(int parallelismHint) {
+ this.parallelismHint = parallelismHint;
+ return this;
+ }
+
public StreamableTable build() {
final Statistic stat = buildStatistic();
final Table tbl = new Table() {
@@ -135,7 +142,12 @@ public class CompilerUtil {
}
};
- return new StreamableTable() {
+ return new ParallelStreamableTable() {
+ @Override
+ public Integer parallelismHint() {
+ return parallelismHint;
+ }
+
@Override
public Table stream() {
return tbl;
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index d810d3a..670eedb 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.Properties;
public class SqlCreateTable extends SqlCall {
+ private static final int DEFAULT_PARALLELISM = 1;
+
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
"CREATE_TABLE", SqlKind.OTHER) {
@Override
@@ -44,7 +46,7 @@ public class SqlCreateTable extends SqlCall {
SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
assert functionQualifier == null;
return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
- o[2], o[3], o[4], o[5], o[6]);
+ o[2], o[3], o[4], o[5], o[6], o[7]);
}
@Override
@@ -60,6 +62,9 @@ public class SqlCreateTable extends SqlCall {
t.outputFormatClass);
}
u.keyword("LOCATION").node(t.location);
+ if (t.parallelism != null) {
+ u.keyword("PARALLELISM").node(t.parallelism);
+ }
if (t.properties != null) {
u.keyword("TBLPROPERTIES").node(t.properties);
}
@@ -74,19 +79,21 @@ public class SqlCreateTable extends SqlCall {
private final SqlNode inputFormatClass;
private final SqlNode outputFormatClass;
private final SqlNode location;
+ private final SqlNode parallelism;
private final SqlNode properties;
private final SqlNode query;
public SqlCreateTable(
- SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
- SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
- SqlNode properties, SqlNode query) {
+ SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
+ SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
+ SqlNode parallelism, SqlNode properties, SqlNode query) {
super(pos);
this.tblName = tblName;
this.fieldList = fieldList;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
this.location = location;
+ this.parallelism = parallelism;
this.properties = properties;
this.query = query;
}
@@ -116,6 +123,15 @@ public class SqlCreateTable extends SqlCall {
return URI.create(getString(location));
}
+ public Integer parallelism() {
+ String parallelismStr = getString(parallelism);
+ if (parallelismStr != null) {
+ return Integer.parseInt(parallelismStr);
+ } else {
+ return DEFAULT_PARALLELISM;
+ }
+ }
+
public String inputFormatClass() {
return getString(inputFormatClass);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
index bc143ec..c563d73 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
@@ -30,8 +30,11 @@ import org.apache.storm.trident.fluent.IAggregatableStream;
import java.util.Map;
public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
- public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ private final int parallelismHint;
+
+ public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
super(cluster, traitSet, table);
+ this.parallelismHint = parallelismHint;
}
@Override
@@ -45,7 +48,8 @@ public class TridentStreamScanRel extends StormStreamScanRelBase implements Trid
}
String stageName = StormRelUtils.getStageName(this);
- IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer());
+ IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
+ .parallelismHint(parallelismHint);
planCreator.addStream(finalStream);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
index d863a66..abbd680 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
@@ -23,11 +23,13 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
public class TridentScanRule extends ConverterRule {
public static final TridentScanRule INSTANCE = new TridentScanRule();
+ public static final int DEFAULT_PARALLELISM_HINT = 1;
private TridentScanRule() {
super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
@@ -36,13 +38,19 @@ public class TridentScanRule extends ConverterRule {
@Override
public RelNode convert(RelNode rel) {
final TableScan scan = (TableScan) rel;
- final Table table = scan.getTable().unwrap(Table.class);
+ int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+ final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
+ if (parallelTable != null && parallelTable.parallelismHint() != null) {
+ parallelismHint = parallelTable.parallelismHint();
+ }
+ final Table table = scan.getTable().unwrap(Table.class);
switch (table.getJdbcTableType()) {
case STREAM:
return new TridentStreamScanRel(scan.getCluster(),
scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- scan.getTable());
+ scan.getTable(), parallelismHint);
default:
throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
}
[2/3] storm git commit: Merge branch 'STORM-1443-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-1443-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad50b8d0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad50b8d0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad50b8d0
Branch: refs/heads/1.x-branch
Commit: ad50b8d09d05ca0b34ebd1d4367a1807512aa19b
Parents: 02ab70c d34c2eb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 10:26:16 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 10:26:16 2017 +0900
----------------------------------------------------------------------
docs/storm-sql.md | 12 +++++--
external/sql/README.md | 7 ++--
.../storm-sql-core/src/codegen/data/Parser.tdd | 1 +
.../src/codegen/includes/parserImpls.ftl | 4 ++-
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 3 ++
.../sql/calcite/ParallelStreamableTable.java | 35 ++++++++++++++++++++
.../apache/storm/sql/compiler/CompilerUtil.java | 14 +++++++-
.../apache/storm/sql/parser/SqlCreateTable.java | 24 +++++++++++---
.../trident/rel/TridentStreamScanRel.java | 8 +++--
.../planner/trident/rules/TridentScanRule.java | 12 +++++--
10 files changed, 105 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: STORM-1443: CHANGELOG
Posted by ka...@apache.org.
STORM-1443: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7978401
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7978401
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7978401
Branch: refs/heads/1.x-branch
Commit: a79784010a65110593ab47ea0b447b3d1c42b84a
Parents: ad50b8d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 10:26:39 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 10:26:39 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a7978401/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index dbb892d..0c60472 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
* STORM-2148: [Storm SQL] Trident mode: back to code generate and compile Trident topology
* STORM-2321: Handle blobstore zk key deletion in KeySequenceNumber
* STORM-2336: Close Localizer and AsyncLocalizer when supervisor is shutting down