You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:55 UTC

[10/13] flink git commit: [FLINK-8866] [table] Move table type out of descriptors

[FLINK-8866] [table] Move table type out of descriptors

The declaration of a table type is SQL Client/context specific and should not be part of a descriptor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09fbfdfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09fbfdfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09fbfdfa

Branch: refs/heads/master
Commit: 09fbfdfa76b068fcc8de249fe7cdcd01fd1f350e
Parents: abbb890
Author: Timo Walther <tw...@apache.org>
Authored: Thu Jul 12 09:05:50 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200

----------------------------------------------------------------------
 .../flink/table/client/config/Environment.java  | 30 ++++++++++++--------
 .../flink/table/client/config/SourceSink.java   | 10 ++-----
 .../descriptors/TableDescriptorValidator.scala  | 15 ----------
 .../table/descriptors/TableSinkDescriptor.scala |  6 ++--
 .../descriptors/TableSourceDescriptor.scala     |  2 --
 5 files changed, 24 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 2b59e06..0853afb 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.client.config;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableDescriptor;
-import org.apache.flink.table.descriptors.TableDescriptorValidator;
 
 import java.io.IOException;
 import java.net.URL;
@@ -50,6 +49,9 @@ public class Environment {
 
 	private static final String TABLE_NAME = "name";
 	private static final String TABLE_TYPE = "type";
+	private static final String TABLE_TYPE_VALUE_SOURCE = "source";
+	private static final String TABLE_TYPE_VALUE_SINK = "sink";
+	private static final String TABLE_TYPE_VALUE_BOTH = "both";
 
 	public Environment() {
 		this.tables = Collections.emptyMap();
@@ -214,17 +216,21 @@ public class Environment {
 		if (typeObject == null || !(typeObject instanceof String)) {
 			throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
 		}
-		final String type = (String) config.get(TABLE_TYPE);
-		config.remove(TABLE_TYPE);
-		final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
-		if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
-			return new Source(name, normalizedConfig);
-		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
-			return new Sink(name, normalizedConfig);
-		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
-			return new SourceSink(name, normalizedConfig);
+		final String type = (String) typeObject;
+		final Map<String, Object> configCopy = new HashMap<>(config);
+		configCopy.remove(TABLE_TYPE);
+
+		final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(configCopy);
+		switch (type) {
+			case TABLE_TYPE_VALUE_SOURCE:
+				return new Source(name, normalizedConfig);
+			case TABLE_TYPE_VALUE_SINK:
+				return new Sink(name, normalizedConfig);
+			case TABLE_TYPE_VALUE_BOTH:
+				return new SourceSink(name, normalizedConfig);
+			default:
+				throw new SqlClientException(String.format("Invalid 'type' attribute for table '%s'. " +
+					"Only 'source', 'sink', and 'both' are supported. But was '%s'.", name, type));
 		}
-		throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " +
-			"Only 'source', 'sink', and 'both' are supported.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
index 33fb0f1..bfa3c44 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
@@ -50,16 +50,10 @@ public class SourceSink extends TableDescriptor {
 	}
 
 	public Source toSource() {
-		final Map<String, String> newProperties = new HashMap<>(properties);
-		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-				TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
-		return new Source(name, newProperties);
+		return new Source(name, properties);
 	}
 
 	public Sink toSink() {
-		final Map<String, String> newProperties = new HashMap<>(properties);
-		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-				TableDescriptorValidator.TABLE_TYPE_VALUE_SINK());
-		return new Sink(name, newProperties);
+		return new Sink(name, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
index c712e72..e0fa602 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
@@ -27,18 +27,3 @@ class TableDescriptorValidator extends DescriptorValidator {
     // nothing to do
   }
 }
-
-object TableDescriptorValidator {
-
-  /**
-    * Key for describing the type of this table, valid values are ('source', 'sink', 'both').
-    */
-  val TABLE_TYPE = "type"
-
-  /**
-    * Valid TABLE_TYPE value.
-    */
-  val TABLE_TYPE_VALUE_SOURCE = "source"
-  val TABLE_TYPE_VALUE_SINK = "sink"
-  val TABLE_TYPE_VALUE_SOURCE_SINK = "both"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
index 97ed47d..0a4d504 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
@@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors
   * Common class for all descriptors describing a table sink.
   */
 abstract class TableSinkDescriptor extends TableDescriptor {
+
+  /**
+    * Internal method for properties conversion.
+    */
   override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
     super.addProperties(properties)
-    properties.putString(TableDescriptorValidator.TABLE_TYPE,
-      TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09fbfdfa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
index 2a0b67c..3ca39c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -36,8 +36,6 @@ abstract class TableSourceDescriptor extends TableDescriptor {
     */
   override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
     super.addProperties(properties)
-    properties.putString(TableDescriptorValidator.TABLE_TYPE,
-      TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
     statisticsDescriptor.foreach(_.addProperties(properties))
   }