You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/20 14:08:10 UTC
flink git commit: [FLINK-4247] [table] CsvTableSource.getDataSet()
expects Java ExecutionEnvironment
Repository: flink
Updated Branches:
refs/heads/master 5901bf33b -> 0975d9f11
[FLINK-4247] [table] CsvTableSource.getDataSet() expects Java ExecutionEnvironment
This closes #2298.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0975d9f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0975d9f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0975d9f1
Branch: refs/heads/master
Commit: 0975d9f11dc09f8b1ea420d660175874d423cac3
Parents: 5901bf3
Author: twalthr <tw...@apache.org>
Authored: Tue Jul 26 17:27:02 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 20 16:02:37 2016 +0200
----------------------------------------------------------------------
.../api/table/sources/BatchTableSource.scala | 7 ++++-
.../api/table/sources/CsvTableSource.scala | 28 +++++++++++++-------
.../api/table/sources/StreamTableSource.scala | 7 ++++-
.../connectors/kafka/KafkaTableSource.java | 4 +++
4 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
index 7abb03c..74e4cd6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
@@ -26,6 +26,11 @@ import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
*/
trait BatchTableSource[T] extends TableSource[T] {
- /** Returns the data of the table as a [[DataSet]]. */
+ /**
+ * Returns the data of the table as a [[DataSet]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index 40fdf82..9cf4397 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -56,13 +56,13 @@ class CsvTableSource(
with StreamTableSource[Row] {
/**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- */
+ * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+ * (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ */
def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
@@ -73,7 +73,12 @@ class CsvTableSource(
private val returnType = new RowTypeInfo(fieldTypes)
- /** Returns the data of the table as a [[DataSet]] of [[Row]]. */
+ /**
+ * Returns the data of the table as a [[DataSet]] of [[Row]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
execEnv.createInput(createCsvInput(), returnType)
}
@@ -90,7 +95,12 @@ class CsvTableSource(
/** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
override def getReturnType: RowTypeInfo = returnType
- /** Returns the data of the table as a [[DataStream]] of [[Row]]. */
+ /**
+ * Returns the data of the table as a [[DataStream]] of [[Row]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
streamExecEnv.createInput(createCsvInput(), returnType)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
index c30d9aa..cdae0b3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
@@ -27,6 +27,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
*/
trait StreamTableSource[T] extends TableSource[T] {
- /** Returns the data of the table as a [[DataStream]]. */
+ /**
+ * Returns the data of the table as a [[DataStream]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index fc6bf44..446f203 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -99,6 +99,10 @@ abstract class KafkaTableSource implements StreamTableSource<Row> {
"Number of provided field names and types does not match.");
}
+ /**
+ * NOTE: This method is for internal use only for defining a TableSource.
+ * Do not use it in Table API programs.
+ */
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
// Version-specific Kafka consumer