You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:55 UTC
[10/13] flink git commit: [FLINK-8866] [table] Move table type out of
descriptors
[FLINK-8866] [table] Move table type out of descriptors
The declaration of a table type is SQL Client/context specific and should not be part of a descriptor.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09fbfdfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09fbfdfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09fbfdfa
Branch: refs/heads/master
Commit: 09fbfdfa76b068fcc8de249fe7cdcd01fd1f350e
Parents: abbb890
Author: Timo Walther <tw...@apache.org>
Authored: Thu Jul 12 09:05:50 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200
----------------------------------------------------------------------
.../flink/table/client/config/Environment.java | 30 ++++++++++++--------
.../flink/table/client/config/SourceSink.java | 10 ++-----
.../descriptors/TableDescriptorValidator.scala | 15 ----------
.../table/descriptors/TableSinkDescriptor.scala | 6 ++--
.../descriptors/TableSourceDescriptor.scala | 2 --
5 files changed, 24 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 2b59e06..0853afb 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.client.config;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableDescriptor;
-import org.apache.flink.table.descriptors.TableDescriptorValidator;
import java.io.IOException;
import java.net.URL;
@@ -50,6 +49,9 @@ public class Environment {
private static final String TABLE_NAME = "name";
private static final String TABLE_TYPE = "type";
+ private static final String TABLE_TYPE_VALUE_SOURCE = "source";
+ private static final String TABLE_TYPE_VALUE_SINK = "sink";
+ private static final String TABLE_TYPE_VALUE_BOTH = "both";
public Environment() {
this.tables = Collections.emptyMap();
@@ -214,17 +216,21 @@ public class Environment {
if (typeObject == null || !(typeObject instanceof String)) {
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
}
- final String type = (String) config.get(TABLE_TYPE);
- config.remove(TABLE_TYPE);
- final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
- if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
- return new Source(name, normalizedConfig);
- } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
- return new Sink(name, normalizedConfig);
- } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
- return new SourceSink(name, normalizedConfig);
+ final String type = (String) typeObject;
+ final Map<String, Object> configCopy = new HashMap<>(config);
+ configCopy.remove(TABLE_TYPE);
+
+ final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(configCopy);
+ switch (type) {
+ case TABLE_TYPE_VALUE_SOURCE:
+ return new Source(name, normalizedConfig);
+ case TABLE_TYPE_VALUE_SINK:
+ return new Sink(name, normalizedConfig);
+ case TABLE_TYPE_VALUE_BOTH:
+ return new SourceSink(name, normalizedConfig);
+ default:
+ throw new SqlClientException(String.format("Invalid 'type' attribute for table '%s'. " +
+ "Only 'source', 'sink', and 'both' are supported. But was '%s'.", name, type));
}
- throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " +
- "Only 'source', 'sink', and 'both' are supported.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
index 33fb0f1..bfa3c44 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
@@ -50,16 +50,10 @@ public class SourceSink extends TableDescriptor {
}
public Source toSource() {
- final Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
- return new Source(name, newProperties);
+ return new Source(name, properties);
}
public Sink toSink() {
- final Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
- TableDescriptorValidator.TABLE_TYPE_VALUE_SINK());
- return new Sink(name, newProperties);
+ return new Sink(name, properties);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
index c712e72..e0fa602 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
@@ -27,18 +27,3 @@ class TableDescriptorValidator extends DescriptorValidator {
// nothing to do
}
}
-
-object TableDescriptorValidator {
-
- /**
- * Key for describing the type of this table, valid values are ('source', 'sink', 'both').
- */
- val TABLE_TYPE = "type"
-
- /**
- * Valid TABLE_TYPE value.
- */
- val TABLE_TYPE_VALUE_SOURCE = "source"
- val TABLE_TYPE_VALUE_SINK = "sink"
- val TABLE_TYPE_VALUE_SOURCE_SINK = "both"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
index 97ed47d..0a4d504 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
@@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors
* Common class for all descriptors describing a table sink.
*/
abstract class TableSinkDescriptor extends TableDescriptor {
+
+ /**
+ * Internal method for properties conversion.
+ */
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
super.addProperties(properties)
- properties.putString(TableDescriptorValidator.TABLE_TYPE,
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
index 2a0b67c..3ca39c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -36,8 +36,6 @@ abstract class TableSourceDescriptor extends TableDescriptor {
*/
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
super.addProperties(properties)
- properties.putString(TableDescriptorValidator.TABLE_TYPE,
- TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
statisticsDescriptor.foreach(_.addProperties(properties))
}