You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 13:18:55 UTC
[flink] branch release-1.9 updated: [FLINK-13074][table-planner]
Fix PartitionableTableSink doesn't work for flink and blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 2ee6373 [FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner
2ee6373 is described below
commit 2ee637352e7f6f91ee1b7cf0e2ca8c1491027702
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 3 17:56:14 2019 +0800
[FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner
This closes #8966
---
.../operations/CatalogSinkModifyOperation.java | 14 +
.../flink/table/catalog/DatabaseCalciteSchema.java | 47 ++-
.../table/sqlexec/SqlToOperationConverter.java | 17 +-
.../flink/table/calcite/FlinkPlannerImpl.scala | 5 +-
.../flink/table/calcite/PreValidateReWriter.scala | 26 +-
.../rules/physical/batch/BatchExecSinkRule.scala | 38 ++-
.../rules/physical/stream/StreamExecSinkRule.scala | 35 +-
.../apache/flink/table/planner/PlannerBase.scala | 35 +-
.../apache/flink/table/sinks/TableSinkUtils.scala | 51 ++-
.../batch/sql/PartitionableSinkITCase.scala | 328 +++++++++++++++++++
.../flink/table/runtime/utils/BatchTestBase.scala | 11 +-
.../apache/flink/table/util/TableTestBase.scala | 7 +-
.../flink/table/catalog/DatabaseCalciteSchema.java | 20 +-
.../table/sqlexec/SqlToOperationConverter.java | 15 +
.../table/api/internal/BatchTableEnvImpl.scala | 26 +-
.../flink/table/api/internal/TableEnvImpl.scala | 50 ++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 9 +-
.../flink/table/calcite/PreValidateReWriter.scala | 2 +-
.../flink/table/plan/schema/TableSinkTable.scala | 47 +++
.../apache/flink/table/planner/StreamPlanner.scala | 68 +++-
.../apache/flink/table/sinks/TableSinkUtils.scala | 53 ++-
.../batch/sql/PartitionableSinkITCase.scala | 363 +++++++++++++++++++++
22 files changed, 1156 insertions(+), 111 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
index 4fee1c6..b3e73d7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -32,18 +33,30 @@ import java.util.Map;
@Internal
public class CatalogSinkModifyOperation implements ModifyOperation {
+ private final Map<String, String> staticPartitions;
private final List<String> tablePath;
private final QueryOperation child;
public CatalogSinkModifyOperation(List<String> tablePath, QueryOperation child) {
+ this(tablePath, child, new HashMap<>());
+ }
+
+ public CatalogSinkModifyOperation(List<String> tablePath,
+ QueryOperation child,
+ Map<String, String> staticPartitions) {
this.tablePath = tablePath;
this.child = child;
+ this.staticPartitions = staticPartitions;
}
public List<String> getTablePath() {
return tablePath;
}
+ public Map<String, String> getStaticPartitions() {
+ return staticPartitions;
+ }
+
@Override
public QueryOperation getChild() {
return child;
@@ -58,6 +71,7 @@ public class CatalogSinkModifyOperation implements ModifyOperation {
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("tablePath", tablePath);
+ params.put("staticPartitions", staticPartitions);
return OperationUtils.formatWithChildren(
"CatalogSink",
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index cc90916..60c773d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.operations.DataStreamQueryOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.RichTableSourceQueryOperation;
+import org.apache.flink.table.plan.schema.TableSinkTable;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
import org.apache.flink.table.sources.LookupableTableSource;
@@ -108,22 +109,36 @@ class DatabaseCalciteSchema extends FlinkSchema {
}
private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
- return table.getTableSource()
- .map(tableSource -> {
- if (!(tableSource instanceof StreamTableSource ||
- tableSource instanceof LookupableTableSource)) {
- throw new TableException(
- "Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
- }
- if (!isStreamingMode && tableSource instanceof StreamTableSource &&
- !((StreamTableSource<?>) tableSource).isBounded()) {
- throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
- }
- return new TableSourceTable<>(
- tableSource,
- isStreamingMode,
- FlinkStatistic.UNKNOWN());
- }).orElseThrow(() -> new TableException("Cannot query a sink only table."));
+ Optional<TableSourceTable> tableSourceTable = table.getTableSource()
+ .map(tableSource -> {
+ if (!(tableSource instanceof StreamTableSource ||
+ tableSource instanceof LookupableTableSource)) {
+ throw new TableException(
+ "Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
+ }
+ if (!isStreamingMode && tableSource instanceof StreamTableSource &&
+ !((StreamTableSource<?>) tableSource).isBounded()) {
+ throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
+ }
+ return new TableSourceTable<>(
+ tableSource,
+ isStreamingMode,
+ FlinkStatistic.UNKNOWN());
+ });
+ if (tableSourceTable.isPresent()) {
+ return tableSourceTable.get();
+ } else {
+ Optional<TableSinkTable> tableSinkTable = table.getTableSink()
+ .map(tableSink -> new TableSinkTable<>(
+ tableSink,
+ FlinkStatistic.UNKNOWN()));
+ if (tableSinkTable.isPresent()) {
+ return tableSinkTable.get();
+ } else {
+ throw new TableException("Cannot convert a connector table " +
+ "without either source or sink.");
+ }
+ }
}
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index ec36796..25b01a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.SqlProperty;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlDropTable;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -29,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.calcite.FlinkTypeSystem;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -84,6 +86,8 @@ public class SqlToOperationConverter {
return converter.convertCreateTable((SqlCreateTable) validated);
} if (validated instanceof SqlDropTable) {
return converter.convertDropTable((SqlDropTable) validated);
+ } else if (validated instanceof RichSqlInsert) {
+ return converter.convertSqlInsert((RichSqlInsert) validated);
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return converter.convertSqlQuery(validated);
} else {
@@ -142,6 +146,17 @@ public class SqlToOperationConverter {
return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
}
+ /** Convert insert into statement. */
+ private Operation convertSqlInsert(RichSqlInsert insert) {
+ // get name of sink table
+ List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;
+ return new CatalogSinkModifyOperation(
+ targetTablePath,
+ (PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
+ insert.getSource()),
+ insert.getStaticPartitionKVs());
+ }
+
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
@@ -196,6 +211,6 @@ public class SqlToOperationConverter {
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);
- return new PlannerQueryOperation(relational.rel);
+ return new PlannerQueryOperation(relational.project());
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index c0b0a90..b9de2da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -107,8 +107,9 @@ class FlinkPlannerImpl(
node.validate()
case _ =>
}
- // no need to validate row type for DDL nodes.
- if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+ // no need to validate row type for DDL and insert nodes.
+ if (sqlNode.getKind.belongsTo(SqlKind.DDL)
+ || sqlNode.getKind == SqlKind.INSERT) {
return sqlNode
}
validator = new FlinkCalciteSqlValidator(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
index 011c85d..3d66edb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -41,8 +41,8 @@ import scala.collection.JavaConversions._
/** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
* interface to do some rewrite work before sql node validation. */
class PreValidateReWriter(
- val catalogReader: CalciteCatalogReader,
- val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
+ val catalogReader: CalciteCatalogReader,
+ val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
override def visit(call: SqlCall): Unit = {
call match {
case r: RichSqlInsert if r.getStaticPartitions.nonEmpty
@@ -80,10 +80,10 @@ object PreValidateReWriter {
* @param partitions Static partition statements
*/
def appendPartitionProjects(sqlInsert: RichSqlInsert,
- calciteCatalogReader: CalciteCatalogReader,
- typeFactory: RelDataTypeFactory,
- select: SqlSelect,
- partitions: SqlNodeList): Unit = {
+ calciteCatalogReader: CalciteCatalogReader,
+ typeFactory: RelDataTypeFactory,
+ select: SqlSelect,
+ partitions: SqlNodeList): Unit = {
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
if (table == null) {
@@ -104,7 +104,7 @@ object PreValidateReWriter {
val id = sqlProperty.getKey
val targetField = SqlValidatorUtil.getTargetField(targetRowType,
typeFactory, id, calciteCatalogReader, relOptTable)
- validateField(assignedFields.containsValue, id, targetField)
+ validateField(idx => !assignedFields.contains(idx), id, targetField)
val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
assignedFields.put(targetField.getIndex,
maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
@@ -142,10 +142,10 @@ object PreValidateReWriter {
* @return Rowtype
*/
private def createTargetRowType(
- typeFactory: RelDataTypeFactory,
- catalogReader: CalciteCatalogReader,
- table: SqlValidatorTable,
- targetColumnList: SqlNodeList): RelDataType = {
+ typeFactory: RelDataTypeFactory,
+ catalogReader: CalciteCatalogReader,
+ table: SqlValidatorTable,
+ targetColumnList: SqlNodeList): RelDataType = {
val baseRowType = table.getRowType
if (targetColumnList == null) return baseRowType
val fields = new util.ArrayList[util.Map.Entry[String, RelDataType]]
@@ -166,8 +166,8 @@ object PreValidateReWriter {
/** Check whether the field is valid. **/
private def validateField(tester: Function[Integer, Boolean],
- id: SqlIdentifier,
- targetField: RelDataTypeField): Unit = {
+ id: SqlIdentifier,
+ targetField: RelDataTypeField): Unit = {
if (targetField == null) {
throw newValidationError(id, RESOURCE.unknownTargetColumn(id.toString))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
index 93aaeb6..06bcdda 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
@@ -18,13 +18,19 @@
package org.apache.flink.table.plan.rules.physical.batch
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelCollations, RelNode}
+
+import collection.JavaConversions._
class BatchExecSinkRule extends ConverterRule(
classOf[FlinkLogicalSink],
@@ -35,8 +41,34 @@ class BatchExecSinkRule extends ConverterRule(
def convert(rel: RelNode): RelNode = {
val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
- // TODO Take PartitionableSink into consideration after FLINK-11993 is done
- val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.BATCH_PHYSICAL)
+ var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+ sinkNode.sink match {
+ case partitionSink: PartitionableTableSink
+ if partitionSink.getPartitionFieldNames != null &&
+ partitionSink.getPartitionFieldNames.nonEmpty =>
+ val partitionFields = partitionSink.getPartitionFieldNames
+ val partitionIndices = partitionFields
+ .map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+ // validate
+ partitionIndices.foreach { idx =>
+ if (idx < 0) {
+ throw new TableException(s"Partitionable sink ${sinkNode.sinkName} field " +
+ s"${partitionFields.get(idx)} must be in the schema.")
+ }
+ }
+
+ requiredTraitSet = requiredTraitSet.plus(
+ FlinkRelDistribution.hash(partitionIndices
+ .map(Integer.valueOf), requireStrict = false))
+
+ if (partitionSink.configurePartitionGrouping(true)) {
+ // default to asc.
+ val fieldCollations = partitionIndices.map(FlinkRelOptUtil.ofRelFieldCollation)
+ requiredTraitSet = requiredTraitSet.plus(RelCollations.of(fieldCollations: _*))
+ }
+ case _ =>
+ }
+ val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
new BatchExecSink(
rel.getCluster,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
index 5ae6ead..3adf6ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
@@ -18,14 +18,19 @@
package org.apache.flink.table.plan.rules.physical.stream
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
import org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink
+import org.apache.flink.table.sinks.{DataStreamTableSink, PartitionableTableSink}
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.RelNode
+import collection.JavaConversions._
+
class StreamExecSinkRule extends ConverterRule(
classOf[FlinkLogicalSink],
FlinkConventions.LOGICAL,
@@ -35,8 +40,34 @@ class StreamExecSinkRule extends ConverterRule(
def convert(rel: RelNode): RelNode = {
val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- // TODO Take PartitionableSink into consideration after FLINK-11993 is done
- val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.STREAM_PHYSICAL)
+ var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+ sinkNode.sink match {
+ case partitionSink: PartitionableTableSink
+ if partitionSink.getPartitionFieldNames != null &&
+ partitionSink.getPartitionFieldNames.nonEmpty =>
+ val partitionFields = partitionSink.getPartitionFieldNames
+ val partitionIndices = partitionFields
+ .map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+ // validate
+ partitionIndices.foreach { idx =>
+ if (idx < 0) {
+ throw new TableException(s"Partitionable sink ${sinkNode.sinkName} field " +
+ s"${partitionFields.get(idx)} must be in the schema.")
+ }
+ }
+
+ if (partitionSink.configurePartitionGrouping(false)) {
+ throw new TableException("Partition grouping in stream mode is not supported yet!")
+ }
+
+ if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {
+ requiredTraitSet = requiredTraitSet.plus(
+ FlinkRelDistribution.hash(partitionIndices
+ .map(Integer.valueOf), requireStrict = false))
+ }
+ case _ =>
+ }
+ val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
new StreamExecSink(
rel.getCluster,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index ddb759d..d24c3b2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -30,22 +31,24 @@ import org.apache.flink.table.executor.ExecutorBase
import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, OutputConversionModifyOperation, PlannerQueryOperation, UnregisteredSinkModifyOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, OutputConversionModifyOperation, UnregisteredSinkModifyOperation}
import org.apache.flink.table.plan.nodes.calcite.LogicalSink
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.plan.optimize.Optimizer
import org.apache.flink.table.plan.reuse.SubplanReuser
import org.apache.flink.table.plan.util.SameRelObjectShuttle
-import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{DataStreamTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import org.apache.flink.table.util.JavaScalaConversionUtil
+
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.{RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.tools.FrameworkConfig
+
import _root_.java.util.{List => JList}
import java.util
@@ -121,21 +124,10 @@ abstract class PlannerBase(
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
- case insert: SqlInsert =>
- // get name of sink table
- val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
- List(new CatalogSinkModifyOperation(
- targetTablePath,
- SqlToOperationConverter.convert(planner,
- insert.getSource).asInstanceOf[PlannerQueryOperation]).asInstanceOf[Operation])
+ case insert: RichSqlInsert =>
+ List(SqlToOperationConverter.convert(planner, insert))
case query if query.getKind.belongsTo(SqlKind.QUERY) =>
- // validate the sql query
- val validated = planner.validate(query)
- // transform to a relational tree
- val relational = planner.rel(validated)
- // can not use SqlToOperationConverter because of the project()
- List(new PlannerQueryOperation(relational.project()))
+ List(SqlToOperationConverter.convert(planner, query))
case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
List(SqlToOperationConverter.convert(planner, ddl))
case _ =>
@@ -173,7 +165,14 @@ abstract class PlannerBase(
case catalogSink: CatalogSinkModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
getTableSink(catalogSink.getTablePath).map(sink => {
- TableSinkUtils.validateSink(catalogSink.getChild, catalogSink.getTablePath, sink)
+ TableSinkUtils.validateSink(catalogSink, catalogSink.getTablePath, sink)
+ sink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames != null
+ && partitionableSink.getPartitionFieldNames.nonEmpty =>
+ partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+ case _ =>
+ }
LogicalSink.create(input, sink, catalogSink.getTablePath.mkString("."))
}) match {
case Some(sinkRel) => sinkRel
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 7ebaf36..019b5d0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -18,28 +18,32 @@
package org.apache.flink.table.sinks
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.operations.CatalogSinkModifyOperation
import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.PlannerTypeUtils
import java.util.{List => JList}
+import collection.JavaConversions._
+
object TableSinkUtils {
/**
- * Checks if the given [[QueryOperation]] can be written to the given [[TableSink]].
- * It checks if the names & the field types match.
+ * Checks if the given [[CatalogSinkModifyOperation]]'s query can be written to
+ * the given [[TableSink]]. It checks if the names & the field types match. If the table
+ * sink is a [[PartitionableTableSink]], also check that the partitions are valid.
*
- * @param query The query that is supposed to be written.
- * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
- * participate in the validation.
+ * @param sinkOperation The sink operation with the query that is supposed to be written.
+ * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
+ * participate in the validation.
* @param sink The sink that we want to write to.
*/
def validateSink(
- query: QueryOperation,
+ sinkOperation: CatalogSinkModifyOperation,
sinkPath: JList[String],
sink: TableSink[_]): Unit = {
+ val query = sinkOperation.getChild
// validate schema of source table and table sink
val srcFieldTypes = query.getTableSchema.getFieldDataTypes
val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
@@ -67,5 +71,36 @@ object TableSinkUtils {
s"Query result schema: $srcSchema\n" +
s"TableSink schema: $sinkSchema")
}
+
+ // check partitions are valid
+ val staticPartitions = sinkOperation.getStaticPartitions
+ if (staticPartitions != null && !staticPartitions.isEmpty) {
+ val invalidMsg = "Can't insert static partitions into a non-partitioned table sink. " +
+ "A partitioned sink should implement 'PartitionableTableSink' and return partition " +
+ "field names via 'getPartitionFieldNames()' method."
+ sink match {
+ case pts: PartitionableTableSink =>
+ val partitionFields = pts.getPartitionFieldNames
+ if (partitionFields == null || partitionFields.isEmpty) {
+ throw new ValidationException(invalidMsg)
+ }
+ staticPartitions.map(_._1) foreach { p =>
+ if (!partitionFields.contains(p)) {
+ throw new ValidationException(s"Static partition column $p " +
+ s"should be in the partition fields list $partitionFields.")
+ }
+ }
+ staticPartitions.map(_._1).zip(partitionFields).foreach {
+ case (p1, p2) =>
+ if (p1 != p2) {
+ throw new ValidationException(s"Static partition column $p1 " +
+ s"should appear before dynamic partition $p2.")
+ }
+ }
+ case _ =>
+ throw new ValidationException(invalidMsg)
+
+ }
+ }
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
new file mode 100644
index 0000000..fe3918e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableException, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase._
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.types.Row
+import org.apache.calcite.config.Lex
+import org.apache.calcite.sql.parser.SqlParser
+import org.junit.Assert._
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
+
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.Seq
+
+/**
+ * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
+ */
+class PartitionableSinkITCase extends BatchTestBase {
+
+ private val _expectedException = ExpectedException.none
+ private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+ @Rule
+ def expectedEx: ExpectedException = _expectedException
+
+ @Before
+ override def before(): Unit = {
+ super.before()
+ env.setParallelism(3)
+ tEnv.getConfig
+ .getConfiguration
+ .setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 3)
+ registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
+ registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
+ PartitionableSinkITCase.init()
+ }
+
+ override def getTableConfig: TableConfig = {
+ val parserConfig = SqlParser.configBuilder
+ .setParserFactory(FlinkSqlParserImpl.FACTORY)
+ .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
+ .setLex(Lex.JAVA)
+ .setIdentifierMaxLength(256).build
+ val plannerConfig = CalciteConfig.createBuilder(CalciteConfig.DEFAULT)
+ .replaceSqlParserConfig(parserConfig)
+ val tableConfig = new TableConfig
+ tableConfig.setPlannerConfig(plannerConfig.build())
+ tableConfig
+ }
+
+ @Test
+ def testInsertWithOutPartitionGrouping(): Unit = {
+ registerTableSink()
+ tEnv.sqlUpdate("insert into sinkTable select a, max(b), c"
+ + " from nonSortTable group by a, c")
+ tEnv.execute("testJob")
+ assertEquals(List("1,5,Hi",
+ "1,5,Hi01",
+ "1,5,Hi02"),
+ RESULT1.sorted)
+ assert(RESULT2.isEmpty)
+ assertEquals(List("2,1,Hello world01",
+ "2,1,Hello world02",
+ "2,1,Hello world03",
+ "2,1,Hello world04",
+ "2,2,Hello world, how are you?",
+ "3,1,Hello world",
+ "3,2,Hello",
+ "3,2,Hello01",
+ "3,2,Hello02",
+ "3,2,Hello03",
+ "3,2,Hello04"),
+ RESULT3.sorted)
+ }
+
+ @Test
+ def testInsertWithPartitionGrouping(): Unit = {
+ registerTableSink()
+ tEnv.sqlUpdate("insert into sinkTable select a, b, c from sortTable")
+ tEnv.execute("testJob")
+ assertEquals(List("1,1,Hello world",
+ "1,1,Hello world, how are you?"),
+ RESULT1.toList)
+ assertEquals(List("4,4,你好,陌生人",
+ "4,4,你好,陌生人,我是",
+ "4,4,你好,陌生人,我是中国人",
+ "4,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT2.toList)
+ assertEquals(List("2,2,Hi",
+ "2,2,Hello",
+ "3,3,I'm fine, thank",
+ "3,3,I'm fine, thank you",
+ "3,3,I'm fine, thank you, and you?"),
+ RESULT3.toList)
+ }
+
+ @Test
+ def testInsertWithStaticPartitions(): Unit = {
+ val testSink = registerTableSink()
+ tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+ tEnv.execute("testJob")
+ // this sink should have been set up with static partitions
+ assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+ assertEquals(List("1,2,Hi",
+ "1,1,Hello world",
+ "1,2,Hello",
+ "1,1,Hello world, how are you?",
+ "1,3,I'm fine, thank",
+ "1,3,I'm fine, thank you",
+ "1,3,I'm fine, thank you, and you?",
+ "1,4,你好,陌生人",
+ "1,4,你好,陌生人,我是",
+ "1,4,你好,陌生人,我是中国人",
+ "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT1.toList)
+ assert(RESULT2.isEmpty)
+ assert(RESULT3.isEmpty)
+ }
+
+ @Test
+ def testInsertWithStaticAndDynamicPartitions(): Unit = {
+ val testSink = registerTableSink(partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+ tEnv.execute("testJob")
+ // this sink should have been set up with static partitions
+ assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+ assertEquals(List("1,3,I'm fine, thank",
+ "1,3,I'm fine, thank you",
+ "1,3,I'm fine, thank you, and you?"),
+ RESULT1.toList)
+ assertEquals(List("1,2,Hi",
+ "1,2,Hello"),
+ RESULT2.toList)
+ assertEquals(List("1,1,Hello world",
+ "1,1,Hello world, how are you?",
+ "1,4,你好,陌生人",
+ "1,4,你好,陌生人,我是",
+ "1,4,你好,陌生人,我是中国人",
+ "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT3.toList)
+ }
+
+ @Test
+ def testDynamicPartitionInFrontOfStaticPartition(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Static partition column b "
+ + "should appear before dynamic partition a")
+ registerTableSink(grouping = true, partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
+ tEnv.execute("testJob")
+ }
+
+ @Test
+ def testStaticPartitionNotInPartitionFields(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Static partition column c " +
+ "should be in the partition fields list [a, b].")
+ registerTableSink(tableName = "sinkTable2", rowType = type4,
+ partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.execute("testJob")
+ }
+
+ @Test
+ def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage(
+ "Can't insert static partitions into a non-partitioned table sink.")
+ registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.execute("testJob")
+ }
+
+ private def registerTableSink(
+ tableName: String = "sinkTable",
+ rowType: RowTypeInfo = type3,
+ grouping: Boolean = true,
+ partitionColumns: Array[String] = Array[String]("a")): TestSink = {
+ val testSink = new TestSink(rowType, grouping, partitionColumns)
+ tEnv.registerTableSink(tableName, testSink)
+ testSink
+ }
+
+ private class TestSink(
+ rowType: RowTypeInfo,
+ supportsGrouping: Boolean,
+ partitionColumns: Array[String])
+ extends StreamTableSink[Row]
+ with PartitionableTableSink {
+ private var staticPartitions: JMap[String, String] = _
+
+ override def getPartitionFieldNames: JList[String] = partitionColumns.toList
+
+ override def setStaticPartition(partitions: JMap[String, String]): Unit =
+ this.staticPartitions = partitions
+
+ override def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+ override def configurePartitionGrouping(s: Boolean): Boolean = {
+ supportsGrouping
+ }
+
+ override def getTableSchema: TableSchema = {
+ new TableSchema(Array("a", "b", "c"), type3.getFieldTypes)
+ }
+
+ override def getOutputType: RowTypeInfo = type3
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+ .setParallelism(dataStream.getParallelism)
+ }
+
+ override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+ dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+ .setParallelism(dataStream.getParallelism)
+ }
+
+ def getStaticPartitions: JMap[String, String] = {
+ staticPartitions
+ }
+ }
+}
+
+object PartitionableSinkITCase {
+ val RESULT1 = new JLinkedList[String]()
+ val RESULT2 = new JLinkedList[String]()
+ val RESULT3 = new JLinkedList[String]()
+ val RESULT_QUEUE: JList[JLinkedList[String]] = new JArrayList[JLinkedList[String]]()
+
+ def init(): Unit = {
+ RESULT1.clear()
+ RESULT2.clear()
+ RESULT3.clear()
+ RESULT_QUEUE.clear()
+ RESULT_QUEUE.add(RESULT1)
+ RESULT_QUEUE.add(RESULT2)
+ RESULT_QUEUE.add(RESULT3)
+ }
+
+ /**
+ * Sink function of unsafe memory.
+ */
+ class UnsafeMemorySinkFunction(outputType: TypeInformation[Row])
+ extends RichSinkFunction[Row] {
+ private var resultSet: JLinkedList[String] = _
+
+ override def open(param: Configuration): Unit = {
+ val taskId = getRuntimeContext.getIndexOfThisSubtask
+ resultSet = RESULT_QUEUE.get(taskId)
+ }
+
+ @throws[Exception]
+ override def invoke(row: Row): Unit = {
+ resultSet.add(row.toString)
+ }
+ }
+
+ val fieldNames = Array("a", "b", "c")
+ val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+ val dataNullables = Array(false, false, false)
+
+ val testData = Seq(
+ row(3, 2L, "Hello03"),
+ row(1, 5L, "Hi"),
+ row(1, 5L, "Hi01"),
+ row(1, 5L, "Hi02"),
+ row(3, 2L, "Hello"),
+ row(3, 2L, "Hello01"),
+ row(2, 1L, "Hello world03"),
+ row(3, 2L, "Hello02"),
+ row(3, 2L, "Hello04"),
+ row(3, 1L, "Hello world"),
+ row(2, 1L, "Hello world01"),
+ row(2, 1L, "Hello world02"),
+ row(2, 1L, "Hello world04"),
+ row(2, 2L, "Hello world, how are you?")
+ )
+
+ val testData1 = Seq(
+ row(2, 2L, "Hi"),
+ row(1, 1L, "Hello world"),
+ row(2, 2L, "Hello"),
+ row(1, 1L, "Hello world, how are you?"),
+ row(3, 3L, "I'm fine, thank"),
+ row(3, 3L, "I'm fine, thank you"),
+ row(3, 3L, "I'm fine, thank you, and you?"),
+ row(4, 4L, "你好,陌生人"),
+ row(4, 4L, "你好,陌生人,我是"),
+ row(4, 4L, "你好,陌生人,我是中国人"),
+ row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
+ )
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index 3e8d304..670bc94 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -54,7 +54,8 @@ import scala.util.Sorting
class BatchTestBase extends BatchAbstractTestBase {
private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
- private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment.create(settings)
+ private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
+ .create(settings, catalogManager = None, getTableConfig)
val tEnv: TableEnvironment = testingTableEnv
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
val env: StreamExecutionEnvironment = planner.getExecEnv
@@ -65,6 +66,14 @@ class BatchTestBase extends BatchAbstractTestBase {
val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
+ " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
+ // TODO: [FLINK-13338] will expose dialect option to TableConfig to
+ // avoid override CalciteConfig by users
+ /**
+ * Subclass should overwrite this method if we want to overwrite configuration during
+ * sql parse to sql to rel conversion phrase.
+ */
+ protected def getTableConfig: TableConfig = new TableConfig
+
@Before
def before(): Unit = {
conf.getConfiguration.setInteger(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 28dacb7..1e88629 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -484,8 +484,9 @@ abstract class TableTestUtil(
isStreamingMode: Boolean,
catalogManager: Option[CatalogManager] = None)
extends TableTestUtilBase(test, isStreamingMode) {
+ protected val tableConfig: TableConfig = new TableConfig
protected val testingTableEnv: TestingTableEnvironment =
- TestingTableEnvironment.create(setting, catalogManager)
+ TestingTableEnvironment.create(setting, catalogManager, tableConfig)
val tableEnv: TableEnvironment = testingTableEnv
tableEnv.getConfig.getConfiguration.setString(
ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
@@ -983,7 +984,8 @@ object TestingTableEnvironment {
def create(
settings: EnvironmentSettings,
- catalogManager: Option[CatalogManager] = None): TestingTableEnvironment = {
+ catalogManager: Option[CatalogManager] = None,
+ tableConfig: TableConfig): TestingTableEnvironment = {
val catalogMgr = catalogManager match {
case Some(c) => c
case _ =>
@@ -996,7 +998,6 @@ object TestingTableEnvironment {
val executorProperties = settings.toExecutorProperties
val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
executorProperties).create(executorProperties)
- val tableConfig = new TableConfig
val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
.asInstanceOf[PlannerBase]
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 46af332..76df4bb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.plan.schema.TableSinkTable;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
import org.apache.flink.table.sources.StreamTableSource;
@@ -100,12 +101,25 @@ class DatabaseCalciteSchema implements Schema {
}
private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
- return table.getTableSource()
+ Optional<TableSourceTable> tableSourceTable = table.getTableSource()
.map(tableSource -> new TableSourceTable<>(
tableSource,
!table.isBatch(),
- FlinkStatistic.UNKNOWN()))
- .orElseThrow(() -> new TableException("Cannot query a sink only table."));
+ FlinkStatistic.UNKNOWN()));
+ if (tableSourceTable.isPresent()) {
+ return tableSourceTable.get();
+ } else {
+ Optional<TableSinkTable> tableSinkTable = table.getTableSink()
+ .map(tableSink -> new TableSinkTable<>(
+ tableSink,
+ FlinkStatistic.UNKNOWN()));
+ if (tableSinkTable.isPresent()) {
+ return tableSinkTable.get();
+ } else {
+ throw new TableException("Cannot convert a connector table " +
+ "without either source or sink.");
+ }
+ }
}
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 27718ae..e0636ef 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.SqlProperty;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlDropTable;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -29,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.calcite.FlinkTypeSystem;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -84,6 +86,8 @@ public class SqlToOperationConverter {
return converter.convertCreateTable((SqlCreateTable) validated);
} if (validated instanceof SqlDropTable) {
return converter.convertDropTable((SqlDropTable) validated);
+ } else if (validated instanceof RichSqlInsert) {
+ return converter.convertSqlInsert((RichSqlInsert) validated);
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return converter.convertSqlQuery(validated);
} else {
@@ -147,6 +151,17 @@ public class SqlToOperationConverter {
return toQueryOperation(flinkPlanner, node);
}
+ /** Convert insert into statement. */
+ private Operation convertSqlInsert(RichSqlInsert insert) {
+ // get name of sink table
+ List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;
+ return new CatalogSinkModifyOperation(
+ targetTablePath,
+ (PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
+ insert.getSource()),
+ insert.getStaticPartitionKVs());
+ }
+
//~ Tools ------------------------------------------------------------------
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 675f4ed..a4ed523 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -48,6 +48,7 @@ import org.apache.flink.table.utils.TableConnectorUtils
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
/**
* The abstract base class for the implementation of batch TableEnvironments.
@@ -115,12 +116,13 @@ abstract class BatchTableEnvImpl(
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
// Give the DataSet to the TableSink to emit it.
- batchSink.emitDataSet(result)
+ batchSink.emitDataSet(shuffleByPartitionFieldsIfNeeded(batchSink, result))
case boundedSink: OutputFormatTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
// translate the Table into a DataSet and provide the type that the TableSink expects.
- val result: DataSet[T] = translate(table)(outputType)
+ val translated: DataSet[T] = translate(table)(outputType)
+ val result = shuffleByPartitionFieldsIfNeeded(boundedSink, translated)
// use the OutputFormat to consume the DataSet.
val dataSink = result.output(boundedSink.getOutputFormat)
dataSink.name(
@@ -134,6 +136,26 @@ abstract class BatchTableEnvImpl(
}
/**
+ * Key by the partition fields if the sink is a [[PartitionableTableSink]].
+ * @param sink the table sink
+ * @param dataSet the data set
+ * @tparam R the data set record type
+ * @return a data set that maybe keyed by.
+ */
+ private def shuffleByPartitionFieldsIfNeeded[R](
+ sink: TableSink[_],
+ dataSet: DataSet[R]): DataSet[R] = {
+ sink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames.nonEmpty =>
+ val fieldNames = sink.getTableSchema.getFieldNames
+ val indices = partitionableSink.getPartitionFieldNames.map(fieldNames.indexOf(_))
+ dataSet.partitionByHash(indices:_*)
+ case _ => dataSet
+ }
+ }
+
+ /**
* Creates a final converter that maps the internal row type to external type.
*
* @param physicalTypeInfo the input of the sink
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 06a04b7..021a732 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.sql.parser.ddl.{SqlCreateTable, SqlDropTable}
+import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
import org.apache.flink.table.catalog._
@@ -33,7 +34,7 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
import org.apache.flink.table.planner.PlanningConfigurationBuilder
-import org.apache.flink.table.sinks.{TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink, TableSinkUtils}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -44,7 +45,7 @@ import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.FrameworkConfig
-import _root_.java.util.Optional
+import _root_.java.util.{Optional, Map => JMap, HashMap => JHashMap}
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.JavaConversions._
@@ -440,10 +441,12 @@ abstract class TableEnvImpl(
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
- case insert: SqlInsert =>
- // validate the SQL query
- val query = insert.getSource
- val validatedQuery = planner.validate(query)
+ case insert: RichSqlInsert =>
+ // validate the insert
+ val validatedInsert = planner.validate(insert).asInstanceOf[RichSqlInsert]
+ // we do not validate the row type for sql insert now, so validate the source
+ // separately.
+ val validatedQuery = planner.validate(validatedInsert.getSource)
val tableOperation = new PlannerQueryOperation(planner.rel(validatedQuery).rel)
// get query result as Table
@@ -453,7 +456,8 @@ abstract class TableEnvImpl(
val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
// insert query result into sink table
- insertInto(queryResult, targetTablePath.asScala:_*)
+ insertInto(queryResult, InsertOptions(insert.getStaticPartitionKVs),
+ targetTablePath.asScala:_*)
case createTable: SqlCreateTable =>
val operation = SqlToOperationConverter
.convert(planner, createTable)
@@ -503,19 +507,27 @@ abstract class TableEnvImpl(
private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
override def insertInto(
- table: Table,
- path: String,
- pathContinued: String*): Unit = {
- insertInto(table, path +: pathContinued: _*)
+ table: Table,
+ path: String,
+ pathContinued: String*): Unit = {
+ insertInto(
+ table,
+ InsertOptions(new JHashMap[String, String]()),
+ path +: pathContinued: _*)
}
+ /** Insert options for executing sql insert. **/
+ case class InsertOptions(staticPartitions: JMap[String, String])
+
/**
* Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
*
* @param table The table to write to the TableSink.
* @param sinkTablePath The name of the registered TableSink.
*/
- private def insertInto(table: Table, sinkTablePath: String*): Unit = {
+ private def insertInto(table: Table,
+ insertOptions: InsertOptions,
+ sinkTablePath: String*): Unit = {
// check that sink table exists
if (null == sinkTablePath) {
@@ -532,7 +544,19 @@ abstract class TableEnvImpl(
case Some(tableSink) =>
// validate schema of source table and table sink
- TableSinkUtils.validateSink(table.getQueryOperation, sinkTablePath.asJava, tableSink)
+ TableSinkUtils.validateSink(
+ insertOptions.staticPartitions,
+ table.getQueryOperation,
+ sinkTablePath.asJava,
+ tableSink)
+ // set static partitions if it is a partitioned table sink
+ tableSink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames != null
+ && partitionableSink.getPartitionFieldNames.nonEmpty =>
+ partitionableSink.setStaticPartition(insertOptions.staticPartitions)
+ case _ =>
+ }
// emit the table to the configured table sink
writeToSink(table, tableSink)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index e92bef4..e3b7e14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -52,9 +52,9 @@ import scala.collection.JavaConversions._
*/
class FlinkPlannerImpl(
config: FrameworkConfig,
- catalogReaderSupplier: JFunction[JBoolean, CatalogReader],
+ val catalogReaderSupplier: JFunction[JBoolean, CatalogReader],
planner: RelOptPlanner,
- typeFactory: FlinkTypeFactory) {
+ val typeFactory: FlinkTypeFactory) {
val operatorTable: SqlOperatorTable = config.getOperatorTable
/** Holds the trait definitions to be registered with planner. May be null. */
@@ -110,8 +110,9 @@ class FlinkPlannerImpl(
node.validate()
case _ =>
}
- // no need to validate row type for DDL nodes.
- if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+ // no need to validate row type for DDL and insert nodes.
+ if (sqlNode.getKind.belongsTo(SqlKind.DDL)
+ || sqlNode.getKind == SqlKind.INSERT) {
return sqlNode
}
validator = new FlinkCalciteSqlValidator(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
index 67a15e4..b371d52 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -105,7 +105,7 @@ object PreValidateReWriter {
val id = sqlProperty.getKey
val targetField = SqlValidatorUtil.getTargetField(targetRowType,
typeFactory, id, calciteCatalogReader, relOptTable)
- validateField(assignedFields.containsValue, id, targetField)
+ validateField(idx => !assignedFields.contains(idx), id, targetField)
val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
assignedFields.put(targetField.getIndex,
maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..37d172e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+
+/**
+ * Class which implements the logic to convert a [[TableSink]] to Calcite Table
+ */
+class TableSinkTable[T](
+ val tableSink: TableSink[T],
+ val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+ extends AbstractTable {
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildLogicalRowType(tableSink.getTableSchema)
+ }
+
+ /**
+ * Returns statistics of current table
+ *
+ * @return statistics of current table
+ */
+ override def getStatistic: FlinkStatistic = statistic
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 057e70a..c7a52f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -21,10 +21,11 @@ import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api._
-import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, PreValidateReWriter}
import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.executor.StreamExecutor
@@ -41,16 +42,19 @@ import org.apache.flink.table.sinks._
import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.util.JavaScalaConversionUtil
+
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+import org.apache.calcite.sql.SqlKind
+
import _root_.java.lang.{Boolean => JBool}
import _root_.java.util
import _root_.java.util.{Objects, List => JList}
import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
/**
* Implementation of [[Planner]] for legacy Flink planner. It supports only streaming use cases.
@@ -101,18 +105,12 @@ class StreamPlanner(
val parsed = planner.parse(stmt)
parsed match {
- case insert: SqlInsert =>
+ case insert: RichSqlInsert =>
val targetColumnList = insert.getTargetColumnList
if (targetColumnList != null && insert.getTargetColumnList.size() != 0) {
throw new ValidationException("Partial inserts are not supported")
}
- // get name of sink table
- val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
- List(new CatalogSinkModifyOperation(targetTablePath,
- SqlToOperationConverter.convert(planner,
- insert.getSource).asInstanceOf[PlannerQueryOperation])
- .asInstanceOf[Operation]).asJava
+ List(SqlToOperationConverter.convert(planner, insert))
case node if node.getKind.belongsTo(SqlKind.QUERY) || node.getKind.belongsTo(SqlKind.DDL) =>
List(SqlToOperationConverter.convert(planner, parsed)).asJava
case _ =>
@@ -155,7 +153,19 @@ class StreamPlanner(
case catalogSink: CatalogSinkModifyOperation =>
getTableSink(catalogSink.getTablePath)
.map(sink => {
- TableSinkUtils.validateSink(catalogSink.getChild, catalogSink.getTablePath, sink)
+ TableSinkUtils.validateSink(
+ catalogSink.getStaticPartitions,
+ catalogSink.getChild,
+ catalogSink.getTablePath,
+ sink)
+ // set static partitions if it is a partitioned sink
+ sink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames != null
+ && partitionableSink.getPartitionFieldNames.nonEmpty =>
+ partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+ case _ =>
+ }
writeToSink(catalogSink.getChild, sink, unwrapQueryConfig)
}) match {
case Some(t) => t
@@ -252,9 +262,23 @@ class StreamPlanner(
val resultSink = sink match {
case retractSink: RetractStreamTableSink[T] =>
+ retractSink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames.nonEmpty =>
+ throw new TableException("Partitionable sink in retract stream mode " +
+ "is not supported yet!")
+ case _ =>
+ }
writeToRetractSink(retractSink, tableOperation, queryConfig)
case upsertSink: UpsertStreamTableSink[T] =>
+ upsertSink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames.nonEmpty =>
+ throw new TableException("Partitionable sink in upsert stream mode " +
+ "is not supported yet!")
+ case _ =>
+ }
writeToUpsertSink(upsertSink, tableOperation, queryConfig)
case appendSink: AppendStreamTableSink[T] =>
@@ -317,7 +341,7 @@ class StreamPlanner(
streamQueryConfig,
withChangeFlag = false)
// Give the DataStream to the TableSink to emit it.
- sink.consumeDataStream(result)
+ sink.consumeDataStream(shuffleByPartitionFieldsIfNeeded(sink, result))
}
private def writeToUpsertSink[T](
@@ -355,6 +379,26 @@ class StreamPlanner(
sink.consumeDataStream(result)
}
+ /**
+ * Key by the partition fields if the sink is a [[PartitionableTableSink]].
+ * @param sink the table sink
+ * @param dataStream the data stream
+ * @tparam R the data stream record type
+ * @return a data stream that maybe keyed by.
+ */
+ private def shuffleByPartitionFieldsIfNeeded[R](
+ sink: TableSink[_],
+ dataStream: DataStream[R]): DataStream[R] = {
+ sink match {
+ case partitionableSink: PartitionableTableSink
+ if partitionableSink.getPartitionFieldNames.nonEmpty =>
+ val fieldNames = sink.getTableSchema.getFieldNames
+ val indices = partitionableSink.getPartitionFieldNames.map(fieldNames.indexOf(_))
+ dataStream.keyBy(indices:_*)
+ case _ => dataStream
+ }
+ }
+
private def translateToType[A](
table: QueryOperation,
queryConfig: StreamQueryConfig,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 875ae27..d68afec 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -18,22 +18,28 @@
package org.apache.flink.table.sinks
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, QueryOperation}
-import java.util.{List => JList}
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.operations.QueryOperation
+
+import java.util.{List => JList, Map => JMap}
+
+import collection.JavaConversions._
object TableSinkUtils {
/**
* Checks if the given [[QueryOperation]] can be written to the given [[TableSink]].
- * It checks if the names & the field types match.
+ * It checks if the names & the field types match. If this sink is a [[PartitionableTableSink]],
+ * it will also validate the partitions.
*
- * @param query The query that is supposed to be written.
- * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
- * participate in the validation.
- * @param sink The sink that we want to write to.
+ * @param staticPartitions Static partitions of the sink if there exists any.
+ * @param query The query that is supposed to be written.
+ * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
+ * participate in the validation.
+ * @param sink The sink that we want to write to.
*/
def validateSink(
+ staticPartitions: JMap[String, String],
query: QueryOperation,
sinkPath: JList[String],
sink: TableSink[_])
@@ -58,9 +64,38 @@ object TableSinkUtils {
throw new ValidationException(
s"Field types of query result and registered TableSink " +
- s"${sinkPath} do not match.\n" +
+ s"$sinkPath do not match.\n" +
s"Query result schema: $srcSchema\n" +
s"TableSink schema: $sinkSchema")
}
+ // check partitions are valid
+ if (staticPartitions != null && !staticPartitions.isEmpty) {
+ val invalidMsg = "Can't insert static partitions into a non-partitioned table sink. " +
+ "A partitioned sink should implement 'PartitionableTableSink' and return partition " +
+ "field names via 'getPartitionFieldNames()' method."
+ sink match {
+ case pts: PartitionableTableSink =>
+ val partitionFields = pts.getPartitionFieldNames
+ if (partitionFields == null || partitionFields.isEmpty) {
+ throw new ValidationException(invalidMsg)
+ }
+ staticPartitions.map(_._1) foreach { p =>
+ if (!partitionFields.contains(p)) {
+ throw new ValidationException(s"Static partition column $p " +
+ s"should be in the partition fields list $partitionFields.")
+ }
+ }
+ staticPartitions.map(_._1).zip(partitionFields).foreach {
+ case (p1, p2) =>
+ if (p1 != p2) {
+ throw new ValidationException(s"Static partition column $p1 " +
+ s"should appear before dynamic partition $p2.")
+ }
+ }
+ case _ =>
+ throw new ValidationException(invalidMsg)
+
+ }
+ }
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
new file mode 100644
index 0000000..d021d3a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.table.api.{DataTypes, PlannerConfig, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
+import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase.{RESULT1, RESULT2, RESULT3, _}
+import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, TableSink}
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.types.Row
+import org.apache.calcite.config.Lex
+import org.apache.calcite.sql.parser.SqlParser
+import org.junit.Assert.assertEquals
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
+
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+import org.apache.flink.api.java
+
+import scala.collection.JavaConversions._
+import scala.collection.Seq
+
+class PartitionableSinkITCase {
+ private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ private var tEnv: BatchTableEnvironment = _
+ private val type3 = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)
+ private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+ private val _expectedException = ExpectedException.none
+
+ @Rule
+ def expectedEx: ExpectedException = _expectedException
+
+ @Before
+ def before(): Unit = {
+ batchExec.setParallelism(3)
+ tEnv = BatchTableEnvironment.create(batchExec)
+ tEnv.getConfig.setPlannerConfig(getPlannerConfig)
+ registerTableSource("nonSortTable", testData.toList)
+ registerTableSource("sortTable", testData1.toList)
+ PartitionableSinkITCase.init()
+ }
+
+ def registerTableSource(name: String, data: List[Row]): Unit = {
+ val tableSchema = TableSchema.builder()
+ .field("a", DataTypes.INT())
+ .field("b", DataTypes.BIGINT())
+ .field("c", DataTypes.STRING())
+ .build()
+ tEnv.registerTableSource(name, new CollectionTableSource(data, 100, tableSchema))
+ }
+
+ private def getPlannerConfig: PlannerConfig = {
+ val parserConfig = SqlParser.configBuilder
+ .setParserFactory(FlinkSqlParserImpl.FACTORY)
+ .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
+ .setLex(Lex.JAVA)
+ .setIdentifierMaxLength(256).build
+ CalciteConfig.createBuilder()
+ .replaceSqlParserConfig(parserConfig)
+ .build()
+ }
+
+ @Test
+ def testInsertWithOutPartitionGrouping(): Unit = {
+ registerTableSink(grouping = false)
+ tEnv.sqlUpdate("insert into sinkTable select a, max(b), c"
+ + " from nonSortTable group by a, c")
+ tEnv.execute("testJob")
+ assertEquals(List("1,5,Hi",
+ "1,5,Hi01",
+ "1,5,Hi02"),
+ RESULT1.sorted)
+ assert(RESULT2.isEmpty)
+ assertEquals(List("2,1,Hello world01",
+ "2,1,Hello world02",
+ "2,1,Hello world03",
+ "2,1,Hello world04",
+ "2,2,Hello world, how are you?",
+ "3,1,Hello world",
+ "3,2,Hello",
+ "3,2,Hello01",
+ "3,2,Hello02",
+ "3,2,Hello03",
+ "3,2,Hello04"),
+ RESULT3.sorted)
+ }
+
+ @Test
+ def testInsertWithPartitionGrouping(): Unit = {
+ registerTableSink()
+ tEnv.sqlUpdate("insert into sinkTable select a, b, c from sortTable")
+ tEnv.execute("testJob")
+ assertEquals(List("1,1,Hello world",
+ "1,1,Hello world, how are you?"),
+ RESULT1.toList)
+ assertEquals(List("4,4,你好,陌生人",
+ "4,4,你好,陌生人,我是",
+ "4,4,你好,陌生人,我是中国人",
+ "4,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT2.toList)
+ assertEquals(List("2,2,Hi",
+ "2,2,Hello",
+ "3,3,I'm fine, thank",
+ "3,3,I'm fine, thank you",
+ "3,3,I'm fine, thank you, and you?"),
+ RESULT3.toList)
+ }
+
+ @Test
+ def testInsertWithStaticPartitions(): Unit = {
+ val testSink = registerTableSink()
+ tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+ tEnv.execute("testJob")
+ // this sink should have been set up with static partitions
+ assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+ assertEquals(List("1,2,Hi",
+ "1,1,Hello world",
+ "1,2,Hello",
+ "1,1,Hello world, how are you?",
+ "1,3,I'm fine, thank",
+ "1,3,I'm fine, thank you",
+ "1,3,I'm fine, thank you, and you?",
+ "1,4,你好,陌生人",
+ "1,4,你好,陌生人,我是",
+ "1,4,你好,陌生人,我是中国人",
+ "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT1.toList)
+ assert(RESULT2.isEmpty)
+ assert(RESULT3.isEmpty)
+ }
+
+ @Test
+ def testInsertWithStaticAndDynamicPartitions(): Unit = {
+ val testSink = registerTableSink(partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+ tEnv.execute("testJob")
+ // this sink should have been set up with static partitions
+ assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+ assertEquals(List("1,3,I'm fine, thank",
+ "1,3,I'm fine, thank you",
+ "1,3,I'm fine, thank you, and you?"),
+ RESULT1.toList)
+ assertEquals(List("1,2,Hi",
+ "1,2,Hello"),
+ RESULT2.toList)
+ assertEquals(List("1,1,Hello world",
+ "1,1,Hello world, how are you?",
+ "1,4,你好,陌生人",
+ "1,4,你好,陌生人,我是",
+ "1,4,你好,陌生人,我是中国人",
+ "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT3.toList)
+ }
+
+ @Test
+ def testDynamicPartitionInFrontOfStaticPartition(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Static partition column b "
+ + "should appear before dynamic partition a")
+ registerTableSink(partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
+ tEnv.execute("testJob")
+ }
+
+ @Test
+ def testStaticPartitionNotInPartitionFields(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage("Static partition column c " +
+ "should be in the partition fields list [a, b].")
+ registerTableSink(tableName = "sinkTable2", rowType = type4,
+ partitionColumns = Array("a", "b"))
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.execute("testJob")
+ }
+
+ @Test
+ def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage(
+ "Can't insert static partitions into a non-partitioned table sink.")
+ registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
+ tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+ tEnv.execute("testJob")
+ }
+
+ private def registerTableSink(
+ tableName: String = "sinkTable",
+ rowType: RowTypeInfo = type3,
+ grouping: Boolean = true,
+ partitionColumns: Array[String] = Array[String]("a")): TestSink = {
+ val testSink = new TestSink(rowType, grouping, partitionColumns)
+ tEnv.registerTableSink(tableName, testSink)
+ testSink
+ }
+
+ private class TestSink(rowType: RowTypeInfo,
+ supportsGrouping: Boolean,
+ partitionColumns: Array[String])
+ extends BatchTableSink[Row]
+ with PartitionableTableSink {
+ private var staticPartitions: JMap[String, String] = _
+
+ override def getPartitionFieldNames: JList[String] = partitionColumns.toList
+
+ override def setStaticPartition(partitions: JMap[String, String]): Unit =
+ this.staticPartitions = partitions
+
+ override def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+ override def configurePartitionGrouping(s: Boolean): Boolean = {
+ supportsGrouping
+ }
+
+ override def getTableSchema: TableSchema = {
+ new TableSchema(Array("a", "b", "c"), rowType.getFieldTypes)
+ }
+
+ override def getOutputType: RowTypeInfo = rowType
+
+ def getStaticPartitions: JMap[String, String] = {
+ staticPartitions
+ }
+
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet.map(new MapFunction[Row, String] {
+ override def map(value: Row): String = value.toString
+ }).output(new CollectionOutputFormat)
+ .setParallelism(dataSet.getExecutionEnvironment.getParallelism)
+ }
+ }
+
+ /**
+ * Table source of collection.
+ */
+ class CollectionTableSource(
+ val data: List[Row],
+ val emitIntervalMs: Long,
+ val schema: TableSchema)
+ extends BatchTableSource[Row] {
+
+ private val rowType: TypeInformation[Row] = schema.toRowType
+
+ override def getReturnType: TypeInformation[Row] = rowType
+
+ override def getTableSchema: TableSchema = {
+ schema
+ }
+
+ override def getDataSet(execEnv: java.ExecutionEnvironment): DataSet[Row] = {
+ execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+ data, rowType.createSerializer(new ExecutionConfig)), rowType)
+ }
+ }
+}
+
+object PartitionableSinkITCase {
+ val RESULT1 = new JLinkedList[String]()
+ val RESULT2 = new JLinkedList[String]()
+ val RESULT3 = new JLinkedList[String]()
+ val RESULT_QUEUE: JList[JLinkedList[String]] = new JArrayList[JLinkedList[String]]()
+
+ def init(): Unit = {
+ RESULT1.clear()
+ RESULT2.clear()
+ RESULT3.clear()
+ RESULT_QUEUE.clear()
+ RESULT_QUEUE.add(RESULT1)
+ RESULT_QUEUE.add(RESULT2)
+ RESULT_QUEUE.add(RESULT3)
+ }
+
+ /** OutputFormat that writes data to a collection. **/
+ class CollectionOutputFormat extends RichOutputFormat[String] {
+ private var resultSet: JLinkedList[String] = _
+
+ override def configure(parameters: Configuration): Unit = {}
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {
+ resultSet = RESULT_QUEUE.get(taskNumber)
+ }
+
+ override def writeRecord(record: String): Unit = {
+ resultSet.add(record)
+ }
+
+ override def close(): Unit = {}
+ }
+
+ val fieldNames = Array("a", "b", "c")
+ val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+ val dataNullables = Array(false, false, false)
+
+ val testData = Seq(
+ row(3, 2L, "Hello03"),
+ row(1, 5L, "Hi"),
+ row(1, 5L, "Hi01"),
+ row(1, 5L, "Hi02"),
+ row(3, 2L, "Hello"),
+ row(3, 2L, "Hello01"),
+ row(2, 1L, "Hello world03"),
+ row(3, 2L, "Hello02"),
+ row(3, 2L, "Hello04"),
+ row(3, 1L, "Hello world"),
+ row(2, 1L, "Hello world01"),
+ row(2, 1L, "Hello world02"),
+ row(2, 1L, "Hello world04"),
+ row(2, 2L, "Hello world, how are you?")
+ )
+
+ val testData1 = Seq(
+ row(2, 2L, "Hi"),
+ row(1, 1L, "Hello world"),
+ row(2, 2L, "Hello"),
+ row(1, 1L, "Hello world, how are you?"),
+ row(3, 3L, "I'm fine, thank"),
+ row(3, 3L, "I'm fine, thank you"),
+ row(3, 3L, "I'm fine, thank you, and you?"),
+ row(4, 4L, "你好,陌生人"),
+ row(4, 4L, "你好,陌生人,我是"),
+ row(4, 4L, "你好,陌生人,我是中国人"),
+ row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
+ )
+
+ def row(args: Any*):Row = {
+ val row = new Row(args.length)
+ 0 until args.length foreach {
+ i => row.setField(i, args(i))
+ }
+ row
+ }
+}