You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 13:18:55 UTC

[flink] branch release-1.9 updated: [FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 2ee6373  [FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner
2ee6373 is described below

commit 2ee637352e7f6f91ee1b7cf0e2ca8c1491027702
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 3 17:56:14 2019 +0800

    [FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner
    
    This closes #8966
---
 .../operations/CatalogSinkModifyOperation.java     |  14 +
 .../flink/table/catalog/DatabaseCalciteSchema.java |  47 ++-
 .../table/sqlexec/SqlToOperationConverter.java     |  17 +-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |   5 +-
 .../flink/table/calcite/PreValidateReWriter.scala  |  26 +-
 .../rules/physical/batch/BatchExecSinkRule.scala   |  38 ++-
 .../rules/physical/stream/StreamExecSinkRule.scala |  35 +-
 .../apache/flink/table/planner/PlannerBase.scala   |  35 +-
 .../apache/flink/table/sinks/TableSinkUtils.scala  |  51 ++-
 .../batch/sql/PartitionableSinkITCase.scala        | 328 +++++++++++++++++++
 .../flink/table/runtime/utils/BatchTestBase.scala  |  11 +-
 .../apache/flink/table/util/TableTestBase.scala    |   7 +-
 .../flink/table/catalog/DatabaseCalciteSchema.java |  20 +-
 .../table/sqlexec/SqlToOperationConverter.java     |  15 +
 .../table/api/internal/BatchTableEnvImpl.scala     |  26 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  50 ++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |   9 +-
 .../flink/table/calcite/PreValidateReWriter.scala  |   2 +-
 .../flink/table/plan/schema/TableSinkTable.scala   |  47 +++
 .../apache/flink/table/planner/StreamPlanner.scala |  68 +++-
 .../apache/flink/table/sinks/TableSinkUtils.scala  |  53 ++-
 .../batch/sql/PartitionableSinkITCase.scala        | 363 +++++++++++++++++++++
 22 files changed, 1156 insertions(+), 111 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
index 4fee1c6..b3e73d7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,18 +33,30 @@ import java.util.Map;
 @Internal
 public class CatalogSinkModifyOperation implements ModifyOperation {
 
+	private final Map<String, String> staticPartitions;
 	private final List<String> tablePath;
 	private final QueryOperation child;
 
 	public CatalogSinkModifyOperation(List<String> tablePath, QueryOperation child) {
+		this(tablePath, child, new HashMap<>());
+	}
+
+	public CatalogSinkModifyOperation(List<String> tablePath,
+			QueryOperation child,
+			Map<String, String> staticPartitions) {
 		this.tablePath = tablePath;
 		this.child = child;
+		this.staticPartitions = staticPartitions;
 	}
 
 	public List<String> getTablePath() {
 		return tablePath;
 	}
 
+	public Map<String, String> getStaticPartitions() {
+		return staticPartitions;
+	}
+
 	@Override
 	public QueryOperation getChild() {
 		return child;
@@ -58,6 +71,7 @@ public class CatalogSinkModifyOperation implements ModifyOperation {
 	public String asSummaryString() {
 		Map<String, Object> params = new LinkedHashMap<>();
 		params.put("tablePath", tablePath);
+		params.put("staticPartitions", staticPartitions);
 
 		return OperationUtils.formatWithChildren(
 			"CatalogSink",
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index cc90916..60c773d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.operations.DataStreamQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.RichTableSourceQueryOperation;
+import org.apache.flink.table.plan.schema.TableSinkTable;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
 import org.apache.flink.table.sources.LookupableTableSource;
@@ -108,22 +109,36 @@ class DatabaseCalciteSchema extends FlinkSchema {
 	}
 
 	private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
-		return table.getTableSource()
-				.map(tableSource -> {
-					if (!(tableSource instanceof StreamTableSource ||
-							tableSource instanceof LookupableTableSource)) {
-						throw new TableException(
-								"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
-					}
-					if (!isStreamingMode && tableSource instanceof StreamTableSource &&
-							!((StreamTableSource<?>) tableSource).isBounded()) {
-						throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
-					}
-					return new TableSourceTable<>(
-							tableSource,
-							isStreamingMode,
-							FlinkStatistic.UNKNOWN());
-				}).orElseThrow(() -> new TableException("Cannot query a sink only table."));
+		Optional<TableSourceTable> tableSourceTable = table.getTableSource()
+			.map(tableSource -> {
+				if (!(tableSource instanceof StreamTableSource ||
+					tableSource instanceof LookupableTableSource)) {
+					throw new TableException(
+						"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
+				}
+				if (!isStreamingMode && tableSource instanceof StreamTableSource &&
+					!((StreamTableSource<?>) tableSource).isBounded()) {
+					throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
+				}
+				return new TableSourceTable<>(
+					tableSource,
+					isStreamingMode,
+					FlinkStatistic.UNKNOWN());
+			});
+		if (tableSourceTable.isPresent()) {
+			return tableSourceTable.get();
+		} else {
+			Optional<TableSinkTable> tableSinkTable = table.getTableSink()
+				.map(tableSink -> new TableSinkTable<>(
+					tableSink,
+					FlinkStatistic.UNKNOWN()));
+			if (tableSinkTable.isPresent()) {
+				return tableSinkTable.get();
+			} else {
+				throw new TableException("Cannot convert a connector table " +
+					"without either source or sink.");
+			}
+		}
 	}
 
 	private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index ec36796..25b01a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -29,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.calcite.FlinkTypeSystem;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -84,6 +86,8 @@ public class SqlToOperationConverter {
 			return converter.convertCreateTable((SqlCreateTable) validated);
 		} if (validated instanceof SqlDropTable) {
 			return converter.convertDropTable((SqlDropTable) validated);
+		} else if (validated instanceof RichSqlInsert) {
+			return converter.convertSqlInsert((RichSqlInsert) validated);
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return converter.convertSqlQuery(validated);
 		} else {
@@ -142,6 +146,17 @@ public class SqlToOperationConverter {
 		return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
 	}
 
+	/** Convert insert into statement. */
+	private Operation convertSqlInsert(RichSqlInsert insert) {
+		// get name of sink table
+		List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;
+		return new CatalogSinkModifyOperation(
+			targetTablePath,
+			(PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
+				insert.getSource()),
+			insert.getStaticPartitionKVs());
+	}
+
 	/** Fallback method for sql query. */
 	private Operation convertSqlQuery(SqlNode node) {
 		return toQueryOperation(flinkPlanner, node);
@@ -196,6 +211,6 @@ public class SqlToOperationConverter {
 	private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
 		// transform to a relational tree
 		RelRoot relational = planner.rel(validated);
-		return new PlannerQueryOperation(relational.rel);
+		return new PlannerQueryOperation(relational.project());
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index c0b0a90..b9de2da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -107,8 +107,9 @@ class FlinkPlannerImpl(
         node.validate()
       case _ =>
     }
-    // no need to validate row type for DDL nodes.
-    if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+    // no need to validate row type for DDL and insert nodes.
+    if (sqlNode.getKind.belongsTo(SqlKind.DDL)
+      || sqlNode.getKind == SqlKind.INSERT) {
       return sqlNode
     }
     validator = new FlinkCalciteSqlValidator(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
index 011c85d..3d66edb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -41,8 +41,8 @@ import scala.collection.JavaConversions._
 /** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
   * interface to do some rewrite work before sql node validation. */
 class PreValidateReWriter(
-  val catalogReader: CalciteCatalogReader,
-  val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
+    val catalogReader: CalciteCatalogReader,
+    val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
   override def visit(call: SqlCall): Unit = {
     call match {
       case r: RichSqlInsert if r.getStaticPartitions.nonEmpty
@@ -80,10 +80,10 @@ object PreValidateReWriter {
     * @param partitions           Static partition statements
     */
   def appendPartitionProjects(sqlInsert: RichSqlInsert,
-    calciteCatalogReader: CalciteCatalogReader,
-    typeFactory: RelDataTypeFactory,
-    select: SqlSelect,
-    partitions: SqlNodeList): Unit = {
+      calciteCatalogReader: CalciteCatalogReader,
+      typeFactory: RelDataTypeFactory,
+      select: SqlSelect,
+      partitions: SqlNodeList): Unit = {
     val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
@@ -104,7 +104,7 @@ object PreValidateReWriter {
       val id = sqlProperty.getKey
       val targetField = SqlValidatorUtil.getTargetField(targetRowType,
         typeFactory, id, calciteCatalogReader, relOptTable)
-      validateField(assignedFields.containsValue, id, targetField)
+      validateField(idx => !assignedFields.contains(idx), id, targetField)
       val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
       assignedFields.put(targetField.getIndex,
         maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
@@ -142,10 +142,10 @@ object PreValidateReWriter {
     * @return Rowtype
     */
   private def createTargetRowType(
-    typeFactory: RelDataTypeFactory,
-    catalogReader: CalciteCatalogReader,
-    table: SqlValidatorTable,
-    targetColumnList: SqlNodeList): RelDataType = {
+      typeFactory: RelDataTypeFactory,
+      catalogReader: CalciteCatalogReader,
+      table: SqlValidatorTable,
+      targetColumnList: SqlNodeList): RelDataType = {
     val baseRowType = table.getRowType
     if (targetColumnList == null) return baseRowType
     val fields = new util.ArrayList[util.Map.Entry[String, RelDataType]]
@@ -166,8 +166,8 @@ object PreValidateReWriter {
 
   /** Check whether the field is valid. **/
   private def validateField(tester: Function[Integer, Boolean],
-    id: SqlIdentifier,
-    targetField: RelDataTypeField): Unit = {
+      id: SqlIdentifier,
+      targetField: RelDataTypeField): Unit = {
     if (targetField == null) {
       throw newValidationError(id, RESOURCE.unknownTargetColumn(id.toString))
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
index 93aaeb6..06bcdda 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
@@ -18,13 +18,19 @@
 
 package org.apache.flink.table.plan.rules.physical.batch
 
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
 import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+import org.apache.flink.table.sinks.PartitionableTableSink
 
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelCollations, RelNode}
+
+import collection.JavaConversions._
 
 class BatchExecSinkRule extends ConverterRule(
     classOf[FlinkLogicalSink],
@@ -35,8 +41,34 @@ class BatchExecSinkRule extends ConverterRule(
   def convert(rel: RelNode): RelNode = {
     val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
     val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
-    // TODO Take PartitionableSink into consideration after FLINK-11993 is done
-    val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.BATCH_PHYSICAL)
+    var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+    sinkNode.sink match {
+      case partitionSink: PartitionableTableSink
+        if partitionSink.getPartitionFieldNames != null &&
+          partitionSink.getPartitionFieldNames.nonEmpty =>
+        val partitionFields = partitionSink.getPartitionFieldNames
+        val partitionIndices = partitionFields
+          .map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+        // validate
+        partitionIndices.foreach { idx =>
+          if (idx < 0) {
+            throw new TableException(s"Partitionable sink ${sinkNode.sinkName} field " +
+              s"${partitionFields.get(idx)} must be in the schema.")
+          }
+        }
+
+        requiredTraitSet = requiredTraitSet.plus(
+          FlinkRelDistribution.hash(partitionIndices
+            .map(Integer.valueOf), requireStrict = false))
+
+        if (partitionSink.configurePartitionGrouping(true)) {
+          // default to asc.
+          val fieldCollations = partitionIndices.map(FlinkRelOptUtil.ofRelFieldCollation)
+          requiredTraitSet = requiredTraitSet.plus(RelCollations.of(fieldCollations: _*))
+        }
+      case _ =>
+    }
+    val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
 
     new BatchExecSink(
       rel.getCluster,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
index 5ae6ead..3adf6ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
@@ -18,14 +18,19 @@
 
 package org.apache.flink.table.plan.rules.physical.stream
 
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
 import org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink
+import org.apache.flink.table.sinks.{DataStreamTableSink, PartitionableTableSink}
 
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.RelNode
 
+import collection.JavaConversions._
+
 class StreamExecSinkRule extends ConverterRule(
     classOf[FlinkLogicalSink],
     FlinkConventions.LOGICAL,
@@ -35,8 +40,34 @@ class StreamExecSinkRule extends ConverterRule(
   def convert(rel: RelNode): RelNode = {
     val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
     val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    // TODO Take PartitionableSink into consideration after FLINK-11993 is done
-    val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.STREAM_PHYSICAL)
+    var requiredTraitSet = sinkNode.getInput.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+    sinkNode.sink match {
+      case partitionSink: PartitionableTableSink
+        if partitionSink.getPartitionFieldNames != null &&
+          partitionSink.getPartitionFieldNames.nonEmpty =>
+        val partitionFields = partitionSink.getPartitionFieldNames
+        val partitionIndices = partitionFields
+          .map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+        // validate
+        partitionIndices.foreach { idx =>
+          if (idx < 0) {
+            throw new TableException(s"Partitionable sink ${sinkNode.sinkName} field " +
+              s"${partitionFields.get(idx)} must be in the schema.")
+          }
+        }
+
+        if (partitionSink.configurePartitionGrouping(false)) {
+          throw new TableException("Partition grouping in stream mode is not supported yet!")
+        }
+
+        if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {
+          requiredTraitSet = requiredTraitSet.plus(
+            FlinkRelDistribution.hash(partitionIndices
+              .map(Integer.valueOf), requireStrict = false))
+        }
+      case _ =>
+    }
+    val newInput = RelOptRule.convert(sinkNode.getInput, requiredTraitSet)
 
     new StreamExecSink(
       rel.getCluster,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index ddb759d..d24c3b2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -30,22 +31,24 @@ import org.apache.flink.table.executor.ExecutorBase
 import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, OutputConversionModifyOperation, PlannerQueryOperation, UnregisteredSinkModifyOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, OutputConversionModifyOperation, UnregisteredSinkModifyOperation}
 import org.apache.flink.table.plan.nodes.calcite.LogicalSink
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
 import org.apache.flink.table.plan.optimize.Optimizer
 import org.apache.flink.table.plan.reuse.SubplanReuser
 import org.apache.flink.table.plan.util.SameRelObjectShuttle
-import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{DataStreamTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
+
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.tools.FrameworkConfig
+
 import _root_.java.util.{List => JList}
 import java.util
 
@@ -121,21 +124,10 @@ abstract class PlannerBase(
     // parse the sql query
     val parsed = planner.parse(stmt)
     parsed match {
-      case insert: SqlInsert =>
-        // get name of sink table
-        val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
-        List(new CatalogSinkModifyOperation(
-          targetTablePath,
-          SqlToOperationConverter.convert(planner,
-            insert.getSource).asInstanceOf[PlannerQueryOperation]).asInstanceOf[Operation])
+      case insert: RichSqlInsert =>
+        List(SqlToOperationConverter.convert(planner, insert))
       case query if query.getKind.belongsTo(SqlKind.QUERY) =>
-        // validate the sql query
-        val validated = planner.validate(query)
-        // transform to a relational tree
-        val relational = planner.rel(validated)
-        // can not use SqlToOperationConverter because of the project()
-        List(new PlannerQueryOperation(relational.project()))
+        List(SqlToOperationConverter.convert(planner, query))
       case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
         List(SqlToOperationConverter.convert(planner, ddl))
       case _ =>
@@ -173,7 +165,14 @@ abstract class PlannerBase(
       case catalogSink: CatalogSinkModifyOperation =>
         val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
         getTableSink(catalogSink.getTablePath).map(sink => {
-          TableSinkUtils.validateSink(catalogSink.getChild, catalogSink.getTablePath, sink)
+          TableSinkUtils.validateSink(catalogSink, catalogSink.getTablePath, sink)
+          sink match {
+            case partitionableSink: PartitionableTableSink
+              if partitionableSink.getPartitionFieldNames != null
+                && partitionableSink.getPartitionFieldNames.nonEmpty =>
+              partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+            case _ =>
+          }
           LogicalSink.create(input, sink, catalogSink.getTablePath.mkString("."))
         }) match {
           case Some(sinkRel) => sinkRel
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 7ebaf36..019b5d0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -18,28 +18,32 @@
 
 package org.apache.flink.table.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.operations.CatalogSinkModifyOperation
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.PlannerTypeUtils
 
 import java.util.{List => JList}
 
+import collection.JavaConversions._
+
 object TableSinkUtils {
 
   /**
-    * Checks if the given [[QueryOperation]] can be written to the given [[TableSink]].
-    * It checks if the names & the field types match.
+    * Checks if the given [[CatalogSinkModifyOperation]]'s query can be written to
+    * the given [[TableSink]]. It checks if the names & the field types match. If the table
+    * sink is a [[PartitionableTableSink]], also check that the partitions are valid.
     *
-    * @param query    The query that is supposed to be written.
-    * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
-    *                 participate in the validation.
+    * @param sinkOperation The sink operation with the query that is supposed to be written.
+    * @param sinkPath      Tha path of the sink. It is needed just for logging. It does not
+    *                      participate in the validation.
     * @param sink     The sink that we want to write to.
     */
   def validateSink(
-      query: QueryOperation,
+      sinkOperation: CatalogSinkModifyOperation,
       sinkPath: JList[String],
       sink: TableSink[_]): Unit = {
+    val query = sinkOperation.getChild
     // validate schema of source table and table sink
     val srcFieldTypes = query.getTableSchema.getFieldDataTypes
     val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
@@ -67,5 +71,36 @@ object TableSinkUtils {
           s"Query result schema: $srcSchema\n" +
           s"TableSink schema:    $sinkSchema")
     }
+
+    // check partitions are valid
+    val staticPartitions = sinkOperation.getStaticPartitions
+    if (staticPartitions != null && !staticPartitions.isEmpty) {
+      val invalidMsg = "Can't insert static partitions into a non-partitioned table sink. " +
+        "A partitioned sink should implement 'PartitionableTableSink' and return partition " +
+        "field names via 'getPartitionFieldNames()' method."
+      sink match {
+        case pts: PartitionableTableSink =>
+          val partitionFields = pts.getPartitionFieldNames
+          if (partitionFields == null || partitionFields.isEmpty) {
+            throw new ValidationException(invalidMsg)
+          }
+          staticPartitions.map(_._1) foreach { p =>
+            if (!partitionFields.contains(p)) {
+              throw new ValidationException(s"Static partition column $p " +
+                s"should be in the partition fields list $partitionFields.")
+            }
+          }
+          staticPartitions.map(_._1).zip(partitionFields).foreach {
+            case (p1, p2) =>
+              if (p1 != p2) {
+                throw new ValidationException(s"Static partition column $p1 " +
+                  s"should appear before dynamic partition $p2.")
+              }
+          }
+        case _ =>
+          throw new ValidationException(invalidMsg)
+
+      }
+    }
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
new file mode 100644
index 0000000..fe3918e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.{ExecutionConfigOptions, TableConfig, TableException, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase._
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.types.Row
+import org.apache.calcite.config.Lex
+import org.apache.calcite.sql.parser.SqlParser
+import org.junit.Assert._
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
+
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.Seq
+
+/**
+  * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
+  */
+class PartitionableSinkITCase extends BatchTestBase {
+
+  private val _expectedException = ExpectedException.none
+  private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  @Rule
+  def expectedEx: ExpectedException = _expectedException
+
+  @Before
+  override def before(): Unit = {
+    super.before()
+    env.setParallelism(3)
+    tEnv.getConfig
+      .getConfiguration
+      .setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 3)
+    registerCollection("nonSortTable", testData, type3, "a, b, c", dataNullables)
+    registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
+    PartitionableSinkITCase.init()
+  }
+
+  override def getTableConfig: TableConfig = {
+    val parserConfig = SqlParser.configBuilder
+      .setParserFactory(FlinkSqlParserImpl.FACTORY)
+      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
+      .setLex(Lex.JAVA)
+      .setIdentifierMaxLength(256).build
+    val plannerConfig = CalciteConfig.createBuilder(CalciteConfig.DEFAULT)
+      .replaceSqlParserConfig(parserConfig)
+    val tableConfig = new TableConfig
+    tableConfig.setPlannerConfig(plannerConfig.build())
+    tableConfig
+  }
+
+  @Test
+  def testInsertWithOutPartitionGrouping(): Unit = {
+    registerTableSink()
+    tEnv.sqlUpdate("insert into sinkTable select a, max(b), c"
+      + " from nonSortTable group by a, c")
+    tEnv.execute("testJob")
+    assertEquals(List("1,5,Hi",
+      "1,5,Hi01",
+      "1,5,Hi02"),
+      RESULT1.sorted)
+    assert(RESULT2.isEmpty)
+    assertEquals(List("2,1,Hello world01",
+      "2,1,Hello world02",
+      "2,1,Hello world03",
+      "2,1,Hello world04",
+      "2,2,Hello world, how are you?",
+      "3,1,Hello world",
+      "3,2,Hello",
+      "3,2,Hello01",
+      "3,2,Hello02",
+      "3,2,Hello03",
+      "3,2,Hello04"),
+      RESULT3.sorted)
+  }
+
+  @Test
+  def testInsertWithPartitionGrouping(): Unit = {
+    registerTableSink()
+    tEnv.sqlUpdate("insert into sinkTable select a, b, c from sortTable")
+    tEnv.execute("testJob")
+    assertEquals(List("1,1,Hello world",
+      "1,1,Hello world, how are you?"),
+      RESULT1.toList)
+    assertEquals(List("4,4,你好,陌生人",
+      "4,4,你好,陌生人,我是",
+      "4,4,你好,陌生人,我是中国人",
+      "4,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT2.toList)
+    assertEquals(List("2,2,Hi",
+      "2,2,Hello",
+      "3,3,I'm fine, thank",
+      "3,3,I'm fine, thank you",
+      "3,3,I'm fine, thank you, and you?"),
+      RESULT3.toList)
+  }
+
+  @Test
+  def testInsertWithStaticPartitions(): Unit = {
+    val testSink = registerTableSink()
+    tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+    tEnv.execute("testJob")
+    // this sink should have been set up with static partitions
+    assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+    assertEquals(List("1,2,Hi",
+      "1,1,Hello world",
+      "1,2,Hello",
+      "1,1,Hello world, how are you?",
+      "1,3,I'm fine, thank",
+      "1,3,I'm fine, thank you",
+      "1,3,I'm fine, thank you, and you?",
+      "1,4,你好,陌生人",
+      "1,4,你好,陌生人,我是",
+      "1,4,你好,陌生人,我是中国人",
+      "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT1.toList)
+    assert(RESULT2.isEmpty)
+    assert(RESULT3.isEmpty)
+  }
+
+  @Test
+  def testInsertWithStaticAndDynamicPartitions(): Unit = {
+    val testSink = registerTableSink(partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+    tEnv.execute("testJob")
+    // this sink should have been set up with static partitions
+    assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+    assertEquals(List("1,3,I'm fine, thank",
+      "1,3,I'm fine, thank you",
+      "1,3,I'm fine, thank you, and you?"),
+      RESULT1.toList)
+    assertEquals(List("1,2,Hi",
+      "1,2,Hello"),
+      RESULT2.toList)
+    assertEquals(List("1,1,Hello world",
+      "1,1,Hello world, how are you?",
+      "1,4,你好,陌生人",
+      "1,4,你好,陌生人,我是",
+      "1,4,你好,陌生人,我是中国人",
+      "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT3.toList)
+  }
+
+  @Test
+  def testDynamicPartitionInFrontOfStaticPartition(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Static partition column b "
+      + "should appear before dynamic partition a")
+    registerTableSink(grouping = true, partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
+    tEnv.execute("testJob")
+  }
+
+  @Test
+  def testStaticPartitionNotInPartitionFields(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Static partition column c " +
+      "should be in the partition fields list [a, b].")
+    registerTableSink(tableName = "sinkTable2", rowType = type4,
+      partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.execute("testJob")
+  }
+
+  @Test
+  def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage(
+      "Can't insert static partitions into a non-partitioned table sink.")
+    registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.execute("testJob")
+  }
+
+  private def registerTableSink(
+      tableName: String = "sinkTable",
+      rowType: RowTypeInfo = type3,
+      grouping: Boolean = true,
+      partitionColumns: Array[String] = Array[String]("a")): TestSink = {
+    val testSink = new TestSink(rowType, grouping, partitionColumns)
+    tEnv.registerTableSink(tableName, testSink)
+    testSink
+  }
+
+  private class TestSink(
+      rowType: RowTypeInfo,
+      supportsGrouping: Boolean,
+      partitionColumns: Array[String])
+    extends StreamTableSink[Row]
+    with PartitionableTableSink {
+    private var staticPartitions: JMap[String, String] = _
+
+    override def getPartitionFieldNames: JList[String] = partitionColumns.toList
+
+    override def setStaticPartition(partitions: JMap[String, String]): Unit =
+      this.staticPartitions = partitions
+
+    override def configure(fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+    override def configurePartitionGrouping(s: Boolean): Boolean = {
+      supportsGrouping
+    }
+
+    override def getTableSchema: TableSchema = {
+      new TableSchema(Array("a", "b", "c"), type3.getFieldTypes)
+    }
+
+    override def getOutputType: RowTypeInfo = type3
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+        .setParallelism(dataStream.getParallelism)
+    }
+
+    override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+      dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+        .setParallelism(dataStream.getParallelism)
+    }
+
+    def getStaticPartitions: JMap[String, String] = {
+      staticPartitions
+    }
+  }
+}
+
+object PartitionableSinkITCase {
+  val RESULT1 = new JLinkedList[String]()
+  val RESULT2 = new JLinkedList[String]()
+  val RESULT3 = new JLinkedList[String]()
+  val RESULT_QUEUE: JList[JLinkedList[String]] = new JArrayList[JLinkedList[String]]()
+
+  def init(): Unit = {
+    RESULT1.clear()
+    RESULT2.clear()
+    RESULT3.clear()
+    RESULT_QUEUE.clear()
+    RESULT_QUEUE.add(RESULT1)
+    RESULT_QUEUE.add(RESULT2)
+    RESULT_QUEUE.add(RESULT3)
+  }
+
+  /**
+    * Sink function of unsafe memory.
+    */
+  class UnsafeMemorySinkFunction(outputType: TypeInformation[Row])
+    extends RichSinkFunction[Row] {
+    private var resultSet: JLinkedList[String] = _
+
+    override def open(param: Configuration): Unit = {
+      val taskId = getRuntimeContext.getIndexOfThisSubtask
+      resultSet = RESULT_QUEUE.get(taskId)
+    }
+
+    @throws[Exception]
+    override def invoke(row: Row): Unit = {
+      resultSet.add(row.toString)
+    }
+  }
+
+  val fieldNames = Array("a", "b", "c")
+  val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+  val dataNullables = Array(false, false, false)
+
+  val testData = Seq(
+    row(3, 2L, "Hello03"),
+    row(1, 5L, "Hi"),
+    row(1, 5L, "Hi01"),
+    row(1, 5L, "Hi02"),
+    row(3, 2L, "Hello"),
+    row(3, 2L, "Hello01"),
+    row(2, 1L, "Hello world03"),
+    row(3, 2L, "Hello02"),
+    row(3, 2L, "Hello04"),
+    row(3, 1L, "Hello world"),
+    row(2, 1L, "Hello world01"),
+    row(2, 1L, "Hello world02"),
+    row(2, 1L, "Hello world04"),
+    row(2, 2L, "Hello world, how are you?")
+  )
+
+  val testData1 = Seq(
+    row(2, 2L, "Hi"),
+    row(1, 1L, "Hello world"),
+    row(2, 2L, "Hello"),
+    row(1, 1L, "Hello world, how are you?"),
+    row(3, 3L, "I'm fine, thank"),
+    row(3, 3L, "I'm fine, thank you"),
+    row(3, 3L, "I'm fine, thank you, and you?"),
+    row(4, 4L, "你好,陌生人"),
+    row(4, 4L, "你好,陌生人,我是"),
+    row(4, 4L, "你好,陌生人,我是中国人"),
+    row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
+  )
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index 3e8d304..670bc94 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -54,7 +54,8 @@ import scala.util.Sorting
 class BatchTestBase extends BatchAbstractTestBase {
 
   private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
-  private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment.create(settings)
+  private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
+    .create(settings, catalogManager = None, getTableConfig)
   val tEnv: TableEnvironment = testingTableEnv
   private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val env: StreamExecutionEnvironment = planner.getExecEnv
@@ -65,6 +66,14 @@ class BatchTestBase extends BatchAbstractTestBase {
   val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line ([0-9]+),"
     + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
 
+  // TODO: [FLINK-13338] will expose dialect option to TableConfig to
+  //  avoid override CalciteConfig by users
+  /**
+    * Subclass should overwrite this method if we want to overwrite configuration during
+    * sql parse to sql to rel conversion phrase.
+    */
+  protected def getTableConfig: TableConfig = new TableConfig
+
   @Before
   def before(): Unit = {
     conf.getConfiguration.setInteger(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 28dacb7..1e88629 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -484,8 +484,9 @@ abstract class TableTestUtil(
     isStreamingMode: Boolean,
     catalogManager: Option[CatalogManager] = None)
   extends TableTestUtilBase(test, isStreamingMode) {
+  protected val tableConfig: TableConfig = new TableConfig
   protected val testingTableEnv: TestingTableEnvironment =
-    TestingTableEnvironment.create(setting, catalogManager)
+    TestingTableEnvironment.create(setting, catalogManager, tableConfig)
   val tableEnv: TableEnvironment = testingTableEnv
   tableEnv.getConfig.getConfiguration.setString(
     ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
@@ -983,7 +984,8 @@ object TestingTableEnvironment {
 
   def create(
       settings: EnvironmentSettings,
-      catalogManager: Option[CatalogManager] = None): TestingTableEnvironment = {
+      catalogManager: Option[CatalogManager] = None,
+      tableConfig: TableConfig): TestingTableEnvironment = {
     val catalogMgr = catalogManager match {
       case Some(c) => c
       case _ =>
@@ -996,7 +998,6 @@ object TestingTableEnvironment {
     val executorProperties = settings.toExecutorProperties
     val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
       executorProperties).create(executorProperties)
-    val tableConfig = new TableConfig
     val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
       .create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
       .asInstanceOf[PlannerBase]
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 46af332..76df4bb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.plan.schema.TableSinkTable;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -100,12 +101,25 @@ class DatabaseCalciteSchema implements Schema {
 	}
 
 	private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
-		return table.getTableSource()
+		Optional<TableSourceTable> tableSourceTable = table.getTableSource()
 			.map(tableSource -> new TableSourceTable<>(
 				tableSource,
 				!table.isBatch(),
-				FlinkStatistic.UNKNOWN()))
-			.orElseThrow(() -> new TableException("Cannot query a sink only table."));
+				FlinkStatistic.UNKNOWN()));
+		if (tableSourceTable.isPresent()) {
+			return tableSourceTable.get();
+		} else {
+			Optional<TableSinkTable> tableSinkTable = table.getTableSink()
+				.map(tableSink -> new TableSinkTable<>(
+					tableSink,
+					FlinkStatistic.UNKNOWN()));
+			if (tableSinkTable.isPresent()) {
+				return tableSinkTable.get();
+			} else {
+				throw new TableException("Cannot convert a connector table " +
+					"without either source or sink.");
+			}
+		}
 	}
 
 	private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 27718ae..e0636ef 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
@@ -29,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.calcite.FlinkTypeSystem;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -84,6 +86,8 @@ public class SqlToOperationConverter {
 			return converter.convertCreateTable((SqlCreateTable) validated);
 		} if (validated instanceof SqlDropTable) {
 			return converter.convertDropTable((SqlDropTable) validated);
+		} else if (validated instanceof RichSqlInsert) {
+			return converter.convertSqlInsert((RichSqlInsert) validated);
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return converter.convertSqlQuery(validated);
 		} else {
@@ -147,6 +151,17 @@ public class SqlToOperationConverter {
 		return toQueryOperation(flinkPlanner, node);
 	}
 
+	/** Convert insert into statement. */
+	private Operation convertSqlInsert(RichSqlInsert insert) {
+		// get name of sink table
+		List<String> targetTablePath = ((SqlIdentifier) insert.getTargetTable()).names;
+		return new CatalogSinkModifyOperation(
+			targetTablePath,
+			(PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
+				insert.getSource()),
+			insert.getStaticPartitionKVs());
+	}
+
 	//~ Tools ------------------------------------------------------------------
 
 	/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 675f4ed..a4ed523 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -48,6 +48,7 @@ import org.apache.flink.table.utils.TableConnectorUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
 
 /**
   * The abstract base class for the implementation of batch TableEnvironments.
@@ -115,12 +116,13 @@ abstract class BatchTableEnvImpl(
         // translate the Table into a DataSet and provide the type that the TableSink expects.
         val result: DataSet[T] = translate(table)(outputType)
         // Give the DataSet to the TableSink to emit it.
-        batchSink.emitDataSet(result)
+        batchSink.emitDataSet(shuffleByPartitionFieldsIfNeeded(batchSink, result))
       case boundedSink: OutputFormatTableSink[T] =>
         val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
           .asInstanceOf[TypeInformation[T]]
         // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table)(outputType)
+        val translated: DataSet[T] = translate(table)(outputType)
+        val result = shuffleByPartitionFieldsIfNeeded(boundedSink, translated)
         // use the OutputFormat to consume the DataSet.
         val dataSink = result.output(boundedSink.getOutputFormat)
         dataSink.name(
@@ -134,6 +136,26 @@ abstract class BatchTableEnvImpl(
   }
 
   /**
+    * Key by the partition fields if the sink is a [[PartitionableTableSink]].
+    * @param sink    the table sink
+    * @param dataSet the data set
+    * @tparam R      the data set record type
+    * @return a data set that maybe keyed by.
+    */
+  private def shuffleByPartitionFieldsIfNeeded[R](
+      sink: TableSink[_],
+      dataSet: DataSet[R]): DataSet[R] = {
+    sink match {
+      case partitionableSink: PartitionableTableSink
+        if partitionableSink.getPartitionFieldNames.nonEmpty =>
+        val fieldNames = sink.getTableSchema.getFieldNames
+        val indices = partitionableSink.getPartitionFieldNames.map(fieldNames.indexOf(_))
+        dataSet.partitionByHash(indices:_*)
+      case _ => dataSet
+    }
+  }
+
+  /**
     * Creates a final converter that maps the internal row type to external type.
     *
     * @param physicalTypeInfo the input of the sink
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 06a04b7..021a732 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.sql.parser.ddl.{SqlCreateTable, SqlDropTable}
+import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
 import org.apache.flink.table.catalog._
@@ -33,7 +34,7 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
 import org.apache.flink.table.planner.PlanningConfigurationBuilder
-import org.apache.flink.table.sinks.{TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -44,7 +45,7 @@ import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
 
-import _root_.java.util.Optional
+import _root_.java.util.{Optional, Map => JMap, HashMap => JHashMap}
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.JavaConversions._
@@ -440,10 +441,12 @@ abstract class TableEnvImpl(
     // parse the sql query
     val parsed = planner.parse(stmt)
     parsed match {
-      case insert: SqlInsert =>
-        // validate the SQL query
-        val query = insert.getSource
-        val validatedQuery = planner.validate(query)
+      case insert: RichSqlInsert =>
+        // validate the insert
+        val validatedInsert = planner.validate(insert).asInstanceOf[RichSqlInsert]
+        // we do not validate the row type for sql insert now, so validate the source
+        // separately.
+        val validatedQuery = planner.validate(validatedInsert.getSource)
 
         val tableOperation = new PlannerQueryOperation(planner.rel(validatedQuery).rel)
         // get query result as Table
@@ -453,7 +456,8 @@ abstract class TableEnvImpl(
         val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
 
         // insert query result into sink table
-        insertInto(queryResult, targetTablePath.asScala:_*)
+        insertInto(queryResult, InsertOptions(insert.getStaticPartitionKVs),
+          targetTablePath.asScala:_*)
       case createTable: SqlCreateTable =>
         val operation = SqlToOperationConverter
           .convert(planner, createTable)
@@ -503,19 +507,27 @@ abstract class TableEnvImpl(
   private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
 
   override def insertInto(
-    table: Table,
-    path: String,
-    pathContinued: String*): Unit = {
-    insertInto(table, path +: pathContinued: _*)
+      table: Table,
+      path: String,
+      pathContinued: String*): Unit = {
+    insertInto(
+      table,
+      InsertOptions(new JHashMap[String, String]()),
+      path +: pathContinued: _*)
   }
 
+  /** Insert options for executing sql insert. **/
+  case class InsertOptions(staticPartitions: JMap[String, String])
+
   /**
     * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
     *
     * @param table The table to write to the TableSink.
     * @param sinkTablePath The name of the registered TableSink.
     */
-  private def insertInto(table: Table, sinkTablePath: String*): Unit = {
+  private def insertInto(table: Table,
+      insertOptions: InsertOptions,
+      sinkTablePath: String*): Unit = {
 
     // check that sink table exists
     if (null == sinkTablePath) {
@@ -532,7 +544,19 @@ abstract class TableEnvImpl(
 
       case Some(tableSink) =>
         // validate schema of source table and table sink
-        TableSinkUtils.validateSink(table.getQueryOperation, sinkTablePath.asJava, tableSink)
+        TableSinkUtils.validateSink(
+          insertOptions.staticPartitions,
+          table.getQueryOperation,
+          sinkTablePath.asJava,
+          tableSink)
+        // set static partitions if it is a partitioned table sink
+        tableSink match {
+          case partitionableSink: PartitionableTableSink
+            if partitionableSink.getPartitionFieldNames != null
+              && partitionableSink.getPartitionFieldNames.nonEmpty =>
+            partitionableSink.setStaticPartition(insertOptions.staticPartitions)
+          case _ =>
+        }
         // emit the table to the configured table sink
         writeToSink(table, tableSink)
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index e92bef4..e3b7e14 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -52,9 +52,9 @@ import scala.collection.JavaConversions._
   */
 class FlinkPlannerImpl(
     config: FrameworkConfig,
-    catalogReaderSupplier: JFunction[JBoolean, CatalogReader],
+    val catalogReaderSupplier: JFunction[JBoolean, CatalogReader],
     planner: RelOptPlanner,
-    typeFactory: FlinkTypeFactory) {
+    val typeFactory: FlinkTypeFactory) {
 
   val operatorTable: SqlOperatorTable = config.getOperatorTable
   /** Holds the trait definitions to be registered with planner. May be null. */
@@ -110,8 +110,9 @@ class FlinkPlannerImpl(
         node.validate()
       case _ =>
     }
-    // no need to validate row type for DDL nodes.
-    if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+    // no need to validate row type for DDL and insert nodes.
+    if (sqlNode.getKind.belongsTo(SqlKind.DDL)
+      || sqlNode.getKind == SqlKind.INSERT) {
       return sqlNode
     }
     validator = new FlinkCalciteSqlValidator(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
index 67a15e4..b371d52 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -105,7 +105,7 @@ object PreValidateReWriter {
       val id = sqlProperty.getKey
       val targetField = SqlValidatorUtil.getTargetField(targetRowType,
           typeFactory, id, calciteCatalogReader, relOptTable)
-      validateField(assignedFields.containsValue, id, targetField)
+      validateField(idx => !assignedFields.contains(idx), id, targetField)
       val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
       assignedFields.put(targetField.getIndex,
         maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..37d172e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+
+/**
+  * Class which implements the logic to convert a [[TableSink]] to Calcite Table
+  */
+class TableSinkTable[T](
+  val tableSink: TableSink[T],
+  val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends AbstractTable {
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(tableSink.getTableSchema)
+  }
+
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: FlinkStatistic = statistic
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 057e70a..c7a52f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -21,10 +21,11 @@ import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api._
-import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, PreValidateReWriter}
 import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.executor.StreamExecutor
@@ -41,16 +42,19 @@ import org.apache.flink.table.sinks._
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.util.JavaScalaConversionUtil
+
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+import org.apache.calcite.sql.SqlKind
+
 import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util
 import _root_.java.util.{Objects, List => JList}
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
 
 /**
   * Implementation of [[Planner]] for legacy Flink planner. It supports only streaming use cases.
@@ -101,18 +105,12 @@ class StreamPlanner(
     val parsed = planner.parse(stmt)
 
     parsed match {
-      case insert: SqlInsert =>
+      case insert: RichSqlInsert =>
         val targetColumnList = insert.getTargetColumnList
         if (targetColumnList != null && insert.getTargetColumnList.size() != 0) {
           throw new ValidationException("Partial inserts are not supported")
         }
-        // get name of sink table
-        val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
-        List(new CatalogSinkModifyOperation(targetTablePath,
-          SqlToOperationConverter.convert(planner,
-            insert.getSource).asInstanceOf[PlannerQueryOperation])
-          .asInstanceOf[Operation]).asJava
+        List(SqlToOperationConverter.convert(planner, insert))
       case node if node.getKind.belongsTo(SqlKind.QUERY) || node.getKind.belongsTo(SqlKind.DDL) =>
         List(SqlToOperationConverter.convert(planner, parsed)).asJava
       case _ =>
@@ -155,7 +153,19 @@ class StreamPlanner(
       case catalogSink: CatalogSinkModifyOperation =>
         getTableSink(catalogSink.getTablePath)
           .map(sink => {
-            TableSinkUtils.validateSink(catalogSink.getChild, catalogSink.getTablePath, sink)
+            TableSinkUtils.validateSink(
+              catalogSink.getStaticPartitions,
+              catalogSink.getChild,
+              catalogSink.getTablePath,
+              sink)
+            // set static partitions if it is a partitioned sink
+            sink match {
+              case partitionableSink: PartitionableTableSink
+                if partitionableSink.getPartitionFieldNames != null
+                  && partitionableSink.getPartitionFieldNames.nonEmpty =>
+                partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+              case _ =>
+            }
             writeToSink(catalogSink.getChild, sink, unwrapQueryConfig)
           }) match {
           case Some(t) => t
@@ -252,9 +262,23 @@ class StreamPlanner(
 
     val resultSink = sink match {
       case retractSink: RetractStreamTableSink[T] =>
+        retractSink match {
+          case partitionableSink: PartitionableTableSink
+            if partitionableSink.getPartitionFieldNames.nonEmpty =>
+            throw new TableException("Partitionable sink in retract stream mode " +
+              "is not supported yet!")
+          case _ =>
+        }
         writeToRetractSink(retractSink, tableOperation, queryConfig)
 
       case upsertSink: UpsertStreamTableSink[T] =>
+        upsertSink match {
+          case partitionableSink: PartitionableTableSink
+            if partitionableSink.getPartitionFieldNames.nonEmpty =>
+            throw new TableException("Partitionable sink in upsert stream mode " +
+              "is not supported yet!")
+          case _ =>
+        }
         writeToUpsertSink(upsertSink, tableOperation, queryConfig)
 
       case appendSink: AppendStreamTableSink[T] =>
@@ -317,7 +341,7 @@ class StreamPlanner(
         streamQueryConfig,
         withChangeFlag = false)
     // Give the DataStream to the TableSink to emit it.
-    sink.consumeDataStream(result)
+    sink.consumeDataStream(shuffleByPartitionFieldsIfNeeded(sink, result))
   }
 
   private def writeToUpsertSink[T](
@@ -355,6 +379,26 @@ class StreamPlanner(
     sink.consumeDataStream(result)
   }
 
+  /**
+    * Key by the partition fields if the sink is a [[PartitionableTableSink]].
+    * @param sink       the table sink
+    * @param dataStream the data stream
+    * @tparam R         the data stream record type
+    * @return a data stream that maybe keyed by.
+    */
+  private def shuffleByPartitionFieldsIfNeeded[R](
+      sink: TableSink[_],
+      dataStream: DataStream[R]): DataStream[R] = {
+    sink match {
+      case partitionableSink: PartitionableTableSink
+        if partitionableSink.getPartitionFieldNames.nonEmpty =>
+        val fieldNames = sink.getTableSchema.getFieldNames
+        val indices = partitionableSink.getPartitionFieldNames.map(fieldNames.indexOf(_))
+        dataStream.keyBy(indices:_*)
+      case _ => dataStream
+    }
+  }
+
   private def translateToType[A](
       table: QueryOperation,
       queryConfig: StreamQueryConfig,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
index 875ae27..d68afec 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/TableSinkUtils.scala
@@ -18,22 +18,28 @@
 
 package org.apache.flink.table.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, QueryOperation}
-import java.util.{List => JList}
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.operations.QueryOperation
+
+import java.util.{List => JList, Map => JMap}
+
+import collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
     * Checks if the given [[QueryOperation]] can be written to the given [[TableSink]].
-    * It checks if the names & the field types match.
+    * It checks if the names & the field types match. If this sink is a [[PartitionableTableSink]],
+    * it will also validate the partitions.
     *
-    * @param query    The query that is supposed to be written.
-    * @param sinkPath Tha path of the sink. It is needed just for logging. It does not
-    *                 participate in the validation.
-    * @param sink     The sink that we want to write to.
+    * @param staticPartitions Static partitions of the sink if there exists any.
+    * @param query            The query that is supposed to be written.
+    * @param sinkPath         Tha path of the sink. It is needed just for logging. It does not
+    *                         participate in the validation.
+    * @param sink             The sink that we want to write to.
     */
   def validateSink(
+      staticPartitions: JMap[String, String],
       query: QueryOperation,
       sinkPath: JList[String],
       sink: TableSink[_])
@@ -58,9 +64,38 @@ object TableSinkUtils {
 
       throw new ValidationException(
         s"Field types of query result and registered TableSink " +
-          s"${sinkPath} do not match.\n" +
+          s"$sinkPath do not match.\n" +
           s"Query result schema: $srcSchema\n" +
           s"TableSink schema:    $sinkSchema")
     }
+    // check partitions are valid
+    if (staticPartitions != null && !staticPartitions.isEmpty) {
+      val invalidMsg = "Can't insert static partitions into a non-partitioned table sink. " +
+        "A partitioned sink should implement 'PartitionableTableSink' and return partition " +
+        "field names via 'getPartitionFieldNames()' method."
+      sink match {
+        case pts: PartitionableTableSink =>
+          val partitionFields = pts.getPartitionFieldNames
+          if (partitionFields == null || partitionFields.isEmpty) {
+            throw new ValidationException(invalidMsg)
+          }
+          staticPartitions.map(_._1) foreach { p =>
+            if (!partitionFields.contains(p)) {
+              throw new ValidationException(s"Static partition column $p " +
+                s"should be in the partition fields list $partitionFields.")
+            }
+          }
+          staticPartitions.map(_._1).zip(partitionFields).foreach {
+            case (p1, p2) =>
+              if (p1 != p2) {
+                throw new ValidationException(s"Static partition column $p1 " +
+                  s"should appear before dynamic partition $p2.")
+              }
+          }
+        case _ =>
+          throw new ValidationException(invalidMsg)
+
+      }
+    }
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
new file mode 100644
index 0000000..d021d3a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.table.api.{DataTypes, PlannerConfig, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
+import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase.{RESULT1, RESULT2, RESULT3, _}
+import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, TableSink}
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.types.Row
+import org.apache.calcite.config.Lex
+import org.apache.calcite.sql.parser.SqlParser
+import org.junit.Assert.assertEquals
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
+
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+import org.apache.flink.api.java
+
+import scala.collection.JavaConversions._
+import scala.collection.Seq
+
+class PartitionableSinkITCase {
+  private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+  private var tEnv: BatchTableEnvironment = _
+  private val type3 = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)
+  private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  private val _expectedException = ExpectedException.none
+
+  @Rule
+  def expectedEx: ExpectedException = _expectedException
+
+  @Before
+  def before(): Unit = {
+    batchExec.setParallelism(3)
+    tEnv = BatchTableEnvironment.create(batchExec)
+    tEnv.getConfig.setPlannerConfig(getPlannerConfig)
+    registerTableSource("nonSortTable", testData.toList)
+    registerTableSource("sortTable", testData1.toList)
+    PartitionableSinkITCase.init()
+  }
+
+  def registerTableSource(name: String, data: List[Row]): Unit = {
+    val tableSchema = TableSchema.builder()
+      .field("a", DataTypes.INT())
+      .field("b", DataTypes.BIGINT())
+      .field("c", DataTypes.STRING())
+      .build()
+    tEnv.registerTableSource(name, new CollectionTableSource(data, 100, tableSchema))
+  }
+
+  private def getPlannerConfig: PlannerConfig = {
+    val parserConfig = SqlParser.configBuilder
+      .setParserFactory(FlinkSqlParserImpl.FACTORY)
+      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
+      .setLex(Lex.JAVA)
+      .setIdentifierMaxLength(256).build
+    CalciteConfig.createBuilder()
+      .replaceSqlParserConfig(parserConfig)
+      .build()
+  }
+
+  @Test
+  def testInsertWithOutPartitionGrouping(): Unit = {
+    registerTableSink(grouping = false)
+    tEnv.sqlUpdate("insert into sinkTable select a, max(b), c"
+      + " from nonSortTable group by a, c")
+    tEnv.execute("testJob")
+    assertEquals(List("1,5,Hi",
+      "1,5,Hi01",
+      "1,5,Hi02"),
+      RESULT1.sorted)
+    assert(RESULT2.isEmpty)
+    assertEquals(List("2,1,Hello world01",
+      "2,1,Hello world02",
+      "2,1,Hello world03",
+      "2,1,Hello world04",
+      "2,2,Hello world, how are you?",
+      "3,1,Hello world",
+      "3,2,Hello",
+      "3,2,Hello01",
+      "3,2,Hello02",
+      "3,2,Hello03",
+      "3,2,Hello04"),
+      RESULT3.sorted)
+  }
+
+  @Test
+  def testInsertWithPartitionGrouping(): Unit = {
+    registerTableSink()
+    tEnv.sqlUpdate("insert into sinkTable select a, b, c from sortTable")
+    tEnv.execute("testJob")
+    assertEquals(List("1,1,Hello world",
+      "1,1,Hello world, how are you?"),
+      RESULT1.toList)
+    assertEquals(List("4,4,你好,陌生人",
+      "4,4,你好,陌生人,我是",
+      "4,4,你好,陌生人,我是中国人",
+      "4,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT2.toList)
+    assertEquals(List("2,2,Hi",
+      "2,2,Hello",
+      "3,3,I'm fine, thank",
+      "3,3,I'm fine, thank you",
+      "3,3,I'm fine, thank you, and you?"),
+      RESULT3.toList)
+  }
+
+  @Test
+  def testInsertWithStaticPartitions(): Unit = {
+    val testSink = registerTableSink()
+    tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+    tEnv.execute("testJob")
+    // this sink should have been set up with static partitions
+    assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+    assertEquals(List("1,2,Hi",
+      "1,1,Hello world",
+      "1,2,Hello",
+      "1,1,Hello world, how are you?",
+      "1,3,I'm fine, thank",
+      "1,3,I'm fine, thank you",
+      "1,3,I'm fine, thank you, and you?",
+      "1,4,你好,陌生人",
+      "1,4,你好,陌生人,我是",
+      "1,4,你好,陌生人,我是中国人",
+      "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT1.toList)
+    assert(RESULT2.isEmpty)
+    assert(RESULT3.isEmpty)
+  }
+
+  @Test
+  def testInsertWithStaticAndDynamicPartitions(): Unit = {
+    val testSink = registerTableSink(partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable partition(a=1) select b, c from sortTable")
+    tEnv.execute("testJob")
+    // this sink should have been set up with static partitions
+    assertEquals(testSink.getStaticPartitions.toMap, Map("a" -> "1"))
+    assertEquals(List("1,3,I'm fine, thank",
+      "1,3,I'm fine, thank you",
+      "1,3,I'm fine, thank you, and you?"),
+      RESULT1.toList)
+    assertEquals(List("1,2,Hi",
+      "1,2,Hello"),
+      RESULT2.toList)
+    assertEquals(List("1,1,Hello world",
+      "1,1,Hello world, how are you?",
+      "1,4,你好,陌生人",
+      "1,4,你好,陌生人,我是",
+      "1,4,你好,陌生人,我是中国人",
+      "1,4,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT3.toList)
+  }
+
+  @Test
+  def testDynamicPartitionInFrontOfStaticPartition(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Static partition column b "
+      + "should appear before dynamic partition a")
+    registerTableSink(partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from sortTable")
+    tEnv.execute("testJob")
+  }
+
+  @Test
+  def testStaticPartitionNotInPartitionFields(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage("Static partition column c " +
+      "should be in the partition fields list [a, b].")
+    registerTableSink(tableName = "sinkTable2", rowType = type4,
+      partitionColumns = Array("a", "b"))
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.execute("testJob")
+  }
+
+  @Test
+  def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
+    expectedEx.expect(classOf[ValidationException])
+    expectedEx.expectMessage(
+      "Can't insert static partitions into a non-partitioned table sink.")
+    registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array())
+    tEnv.sqlUpdate("insert into sinkTable2 partition(c=1) select a, b from sinkTable2")
+    tEnv.execute("testJob")
+  }
+
+  private def registerTableSink(
+      tableName: String = "sinkTable",
+      rowType: RowTypeInfo = type3,
+      grouping: Boolean = true,
+      partitionColumns: Array[String] = Array[String]("a")): TestSink = {
+    val testSink = new TestSink(rowType, grouping, partitionColumns)
+    tEnv.registerTableSink(tableName, testSink)
+    testSink
+  }
+
+  private class TestSink(rowType: RowTypeInfo,
+      supportsGrouping: Boolean,
+      partitionColumns: Array[String])
+    extends BatchTableSink[Row]
+      with PartitionableTableSink {
+    private var staticPartitions: JMap[String, String] = _
+
+    override def getPartitionFieldNames: JList[String] = partitionColumns.toList
+
+    override def setStaticPartition(partitions: JMap[String, String]): Unit =
+      this.staticPartitions = partitions
+
+    override def configure(fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+    override def configurePartitionGrouping(s: Boolean): Boolean = {
+      supportsGrouping
+    }
+
+    override def getTableSchema: TableSchema = {
+      new TableSchema(Array("a", "b", "c"), rowType.getFieldTypes)
+    }
+
+    override def getOutputType: RowTypeInfo = rowType
+
+    def getStaticPartitions: JMap[String, String] = {
+      staticPartitions
+    }
+
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet.map(new MapFunction[Row, String] {
+        override def map(value: Row): String = value.toString
+      }).output(new CollectionOutputFormat)
+        .setParallelism(dataSet.getExecutionEnvironment.getParallelism)
+    }
+  }
+
+  /**
+    * Table source of collection.
+    */
+  class CollectionTableSource(
+    val data: List[Row],
+    val emitIntervalMs: Long,
+    val schema: TableSchema)
+    extends BatchTableSource[Row] {
+
+    private val rowType: TypeInformation[Row] = schema.toRowType
+
+    override def getReturnType: TypeInformation[Row] = rowType
+
+    override def getTableSchema: TableSchema = {
+      schema
+    }
+
+    override def getDataSet(execEnv: java.ExecutionEnvironment): DataSet[Row] = {
+      execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+        data, rowType.createSerializer(new ExecutionConfig)), rowType)
+    }
+  }
+}
+
+object PartitionableSinkITCase {
+  val RESULT1 = new JLinkedList[String]()
+  val RESULT2 = new JLinkedList[String]()
+  val RESULT3 = new JLinkedList[String]()
+  val RESULT_QUEUE: JList[JLinkedList[String]] = new JArrayList[JLinkedList[String]]()
+
+  def init(): Unit = {
+    RESULT1.clear()
+    RESULT2.clear()
+    RESULT3.clear()
+    RESULT_QUEUE.clear()
+    RESULT_QUEUE.add(RESULT1)
+    RESULT_QUEUE.add(RESULT2)
+    RESULT_QUEUE.add(RESULT3)
+  }
+
+  /** OutputFormat that writes data to a collection. **/
+  class CollectionOutputFormat extends RichOutputFormat[String] {
+    private var resultSet: JLinkedList[String] = _
+
+    override def configure(parameters: Configuration): Unit = {}
+
+    override def open(taskNumber: Int, numTasks: Int): Unit = {
+      resultSet = RESULT_QUEUE.get(taskNumber)
+    }
+
+    override def writeRecord(record: String): Unit = {
+      resultSet.add(record)
+    }
+
+    override def close(): Unit = {}
+  }
+
+  val fieldNames = Array("a", "b", "c")
+  val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH))
+  val dataNullables = Array(false, false, false)
+
+  val testData = Seq(
+    row(3, 2L, "Hello03"),
+    row(1, 5L, "Hi"),
+    row(1, 5L, "Hi01"),
+    row(1, 5L, "Hi02"),
+    row(3, 2L, "Hello"),
+    row(3, 2L, "Hello01"),
+    row(2, 1L, "Hello world03"),
+    row(3, 2L, "Hello02"),
+    row(3, 2L, "Hello04"),
+    row(3, 1L, "Hello world"),
+    row(2, 1L, "Hello world01"),
+    row(2, 1L, "Hello world02"),
+    row(2, 1L, "Hello world04"),
+    row(2, 2L, "Hello world, how are you?")
+  )
+
+  val testData1 = Seq(
+    row(2, 2L, "Hi"),
+    row(1, 1L, "Hello world"),
+    row(2, 2L, "Hello"),
+    row(1, 1L, "Hello world, how are you?"),
+    row(3, 3L, "I'm fine, thank"),
+    row(3, 3L, "I'm fine, thank you"),
+    row(3, 3L, "I'm fine, thank you, and you?"),
+    row(4, 4L, "你好,陌生人"),
+    row(4, 4L, "你好,陌生人,我是"),
+    row(4, 4L, "你好,陌生人,我是中国人"),
+    row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
+  )
+
+  def row(args: Any*):Row = {
+    val row = new Row(args.length)
+    0 until args.length foreach {
+      i => row.setField(i, args(i))
+    }
+    row
+  }
+}