You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/23 04:02:29 UTC
flink git commit: [hotfix] [table] Deprecate
SchemaValidator#deriveTableSinkSchema
Repository: flink
Updated Branches:
refs/heads/master 690ab2c31 -> 48791c1ea
[hotfix] [table] Deprecate SchemaValidator#deriveTableSinkSchema
The method combines two separate concepts of table schema and field
mapping. This should be split into two methods once we have support
for the corresponding interfaces (see FLINK-9870).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48791c1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48791c1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48791c1e
Branch: refs/heads/master
Commit: 48791c1ea538727a83ac39613a07c0e6214a8b1d
Parents: 690ab2c
Author: Timo Walther <tw...@apache.org>
Authored: Fri Jul 20 12:59:41 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Jul 23 05:46:49 2018 +0200
----------------------------------------------------------------------
.../client/gateway/utils/TestTableSourceFactory.java | 2 +-
.../flink/table/descriptors/SchemaValidator.scala | 13 +++++--------
.../flink/table/sinks/CsvTableSinkFactoryBase.scala | 2 +-
.../flink/table/utils/InMemoryTableFactory.scala | 2 +-
4 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index 81f00e5..b0b8848 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -83,7 +83,7 @@ public class TestTableSourceFactory implements StreamTableSourceFactory<Row> {
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
- SchemaValidator.deriveTableSourceSchema(params),
+ params.getTableSchema(SCHEMA()),
properties.get(CONNECTOR_TEST_PROPERTY),
proctime.orElse(null),
rowtime);
http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index ec83b3c..af2baba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -175,17 +175,14 @@ object SchemaValidator {
}
/**
- * Derives the table schema for a table source. A table source can directly use "name" and
- * "type" and needs no special handling for time attributes or aliasing.
- */
- def deriveTableSourceSchema(properties: DescriptorProperties): TableSchema = {
- properties.getTableSchema(SCHEMA)
- }
-
- /**
* Derives the table schema for a table sink. A sink ignores a proctime attribute and
* needs to track the origin of a rowtime field.
+ *
+ * @deprecated This method combines two separate concepts of table schema and field mapping.
+ * This should be split into two methods once we have support for
+ * the corresponding interfaces (see FLINK-9870).
*/
+ @deprecated
def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
val builder = TableSchema.builder()
http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
index 6ceba4c..849d16c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -77,7 +77,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
// build
val formatSchema = params.getTableSchema(FORMAT_FIELDS)
- val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+ val tableSchema = params.getTableSchema(SCHEMA)
if (!formatSchema.equals(tableSchema)) {
throw new TableException(
http://git-wip-us.apache.org/repos/asf/flink/blob/48791c1e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 21dfb19..6b86c08 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -76,7 +76,7 @@ class InMemoryTableFactory(terminationCount: Int)
supportsSourceTimestamps = true,
supportsSourceWatermarks = true).validate(params)
- val tableSchema = SchemaValidator.deriveTableSourceSchema(params)
+ val tableSchema = params.getTableSchema(SCHEMA)
// proctime
val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params)