You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/23 04:02:29 UTC

flink git commit: [hotfix] [table] Deprecate SchemaValidator#deriveTableSinkSchema

Repository: flink
Updated Branches:
  refs/heads/master 690ab2c31 -> 48791c1ea


[hotfix] [table] Deprecate SchemaValidator#deriveTableSinkSchema

The method combines two separate concepts of table schema and field
mapping. This should be split into two methods once we have support
for the corresponding interfaces (see FLINK-9870).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48791c1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48791c1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48791c1e

Branch: refs/heads/master
Commit: 48791c1ea538727a83ac39613a07c0e6214a8b1d
Parents: 690ab2c
Author: Timo Walther <tw...@apache.org>
Authored: Fri Jul 20 12:59:41 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Jul 23 05:46:49 2018 +0200

----------------------------------------------------------------------
 .../client/gateway/utils/TestTableSourceFactory.java   |  2 +-
 .../flink/table/descriptors/SchemaValidator.scala      | 13 +++++--------
 .../flink/table/sinks/CsvTableSinkFactoryBase.scala    |  2 +-
 .../flink/table/utils/InMemoryTableFactory.scala       |  2 +-
 4 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index 81f00e5..b0b8848 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -83,7 +83,7 @@ public class TestTableSourceFactory implements StreamTableSourceFactory<Row> {
 		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
 		final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
 		return new TestTableSource(
-			SchemaValidator.deriveTableSourceSchema(params),
+			params.getTableSchema(SCHEMA()),
 			properties.get(CONNECTOR_TEST_PROPERTY),
 			proctime.orElse(null),
 			rowtime);

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index ec83b3c..af2baba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -175,17 +175,14 @@ object SchemaValidator {
   }
 
   /**
-    * Derives the table schema for a table source. A table source can directly use "name" and
-    * "type" and needs no special handling for time attributes or aliasing.
-    */
-  def deriveTableSourceSchema(properties: DescriptorProperties): TableSchema = {
-    properties.getTableSchema(SCHEMA)
-  }
-
-  /**
     * Derives the table schema for a table sink. A sink ignores a proctime attribute and
     * needs to track the origin of a rowtime field.
+    *
+    * @deprecated This method combines two separate concepts of table schema and field mapping.
+    *             This should be split into two methods once we have support for
+    *             the corresponding interfaces (see FLINK-9870).
     */
+  @deprecated
   def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
     val builder = TableSchema.builder()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
index 6ceba4c..849d16c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -77,7 +77,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
 
     // build
     val formatSchema = params.getTableSchema(FORMAT_FIELDS)
-    val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+    val tableSchema = params.getTableSchema(SCHEMA)
 
     if (!formatSchema.equals(tableSchema)) {
       throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 21dfb19..6b86c08 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -76,7 +76,7 @@ class InMemoryTableFactory(terminationCount: Int)
       supportsSourceTimestamps = true,
       supportsSourceWatermarks = true).validate(params)
 
-    val tableSchema = SchemaValidator.deriveTableSourceSchema(params)
+    val tableSchema = params.getTableSchema(SCHEMA)
 
     // proctime
     val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params)