You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:53 UTC
[3/7] flink git commit: [FLINK-5188] [table] [connectors] [core]
Adjust imports and method calls to new Row type.
[FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.
- Port RowCsvInputFormat to Java and move it to flink-core.
This closes #3003.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d27f8f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d27f8f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d27f8f2
Branch: refs/heads/master
Commit: 4d27f8f2deef9fad845ebc91cef121cf9b35f825
Parents: a9e6ec8
Author: tonycox <to...@gmail.com>
Authored: Fri Dec 9 21:41:36 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100
----------------------------------------------------------------------
.../kafka/Kafka010JsonTableSource.java | 2 +-
.../connectors/kafka/Kafka010TableSource.java | 2 +-
.../connectors/kafka/Kafka08JsonTableSink.java | 2 +-
.../kafka/Kafka08JsonTableSource.java | 2 +-
.../connectors/kafka/Kafka08TableSource.java | 2 +-
.../kafka/Kafka08JsonTableSinkTest.java | 2 +-
.../kafka/Kafka08JsonTableSourceTest.java | 2 +-
.../connectors/kafka/Kafka09JsonTableSink.java | 2 +-
.../kafka/Kafka09JsonTableSource.java | 2 +-
.../connectors/kafka/Kafka09TableSource.java | 2 +-
.../kafka/Kafka09JsonTableSinkTest.java | 2 +-
.../kafka/Kafka09JsonTableSourceTest.java | 2 +-
.../connectors/kafka/KafkaJsonTableSink.java | 2 +-
.../connectors/kafka/KafkaTableSink.java | 4 +-
.../connectors/kafka/KafkaTableSource.java | 4 +-
.../JsonRowDeserializationSchema.java | 4 +-
.../JsonRowSerializationSchema.java | 8 +-
.../kafka/JsonRowDeserializationSchemaTest.java | 14 +-
.../kafka/JsonRowSerializationSchemaTest.java | 8 +-
.../kafka/KafkaTableSinkTestBase.java | 4 +-
.../kafka/KafkaTableSourceTestBase.java | 2 +-
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 6 +-
.../api/java/io/jdbc/JDBCOutputFormat.java | 44 +-
.../flink/api/java/io/jdbc/JDBCFullTest.java | 2 +-
.../api/java/io/jdbc/JDBCInputFormatTest.java | 38 +-
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 24 +-
.../flink/api/java/io/jdbc/JDBCTestBase.java | 2 +-
.../flink/api/java/io/RowCsvInputFormat.java | 174 ++++
.../api/java/io/RowCsvInputFormatTest.java | 879 +++++++++++++++++++
.../api/java/table/BatchTableEnvironment.scala | 4 +-
.../api/java/table/StreamTableEnvironment.scala | 4 +-
.../api/scala/table/BatchTableEnvironment.scala | 2 +-
.../scala/table/StreamTableEnvironment.scala | 2 +-
.../apache/flink/api/scala/table/package.scala | 3 +-
.../flink/api/table/BatchTableEnvironment.scala | 3 +-
.../api/table/StreamTableEnvironment.scala | 1 +
.../flink/api/table/TableEnvironment.scala | 7 +-
.../flink/api/table/codegen/CodeGenUtils.scala | 3 +-
.../flink/api/table/codegen/CodeGenerator.scala | 5 +-
.../api/table/codegen/ExpressionReducer.scala | 10 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 8 +-
.../nodes/datastream/DataStreamAggregate.scala | 8 +-
.../DataSetAggregateWithNullValuesRule.scala | 1 +
.../table/plan/schema/TableSourceTable.scala | 6 +-
.../api/table/runtime/aggregate/Aggregate.scala | 2 +-
.../AggregateAllTimeWindowFunction.scala | 2 +-
.../aggregate/AggregateAllWindowFunction.scala | 2 +-
.../aggregate/AggregateMapFunction.scala | 6 +-
.../AggregateReduceCombineFunction.scala | 4 +-
.../AggregateReduceGroupFunction.scala | 4 +-
.../aggregate/AggregateTimeWindowFunction.scala | 2 +-
.../table/runtime/aggregate/AggregateUtil.scala | 7 +-
.../aggregate/AggregateWindowFunction.scala | 2 +-
.../table/runtime/aggregate/AvgAggregate.scala | 62 +-
.../runtime/aggregate/CountAggregate.scala | 8 +-
...rementalAggregateAllTimeWindowFunction.scala | 2 +-
.../IncrementalAggregateAllWindowFunction.scala | 4 +-
.../IncrementalAggregateReduceFunction.scala | 4 +-
...IncrementalAggregateTimeWindowFunction.scala | 2 +-
.../IncrementalAggregateWindowFunction.scala | 4 +-
.../table/runtime/aggregate/MaxAggregate.scala | 14 +-
.../table/runtime/aggregate/MinAggregate.scala | 14 +-
.../table/runtime/aggregate/SumAggregate.scala | 14 +-
.../aggregate/TimeWindowPropertyCollector.scala | 4 +-
.../flink/api/table/sinks/CsvTableSink.scala | 12 +-
.../api/table/sources/CsvTableSource.scala | 9 +-
.../api/table/typeutils/TypeConverter.scala | 11 +-
.../api/java/batch/TableEnvironmentITCase.java | 2 +-
.../flink/api/java/batch/TableSourceITCase.java | 4 +-
.../flink/api/java/batch/sql/SqlITCase.java | 2 +-
.../java/batch/table/AggregationsITCase.java | 2 +-
.../flink/api/java/batch/table/CalcITCase.java | 2 +-
.../api/java/batch/table/CastingITCase.java | 2 +-
.../flink/api/java/batch/table/JoinITCase.java | 2 +-
.../flink/api/java/stream/sql/SqlITCase.java | 2 +-
.../batch/ProjectableTableSourceITCase.scala | 7 +-
.../scala/batch/TableEnvironmentITCase.scala | 3 +-
.../api/scala/batch/TableSourceITCase.scala | 9 +-
.../scala/batch/sql/AggregationsITCase.scala | 3 +-
.../flink/api/scala/batch/sql/CalcITCase.scala | 3 +-
.../flink/api/scala/batch/sql/JoinITCase.scala | 3 +-
.../scala/batch/sql/SetOperatorsITCase.scala | 3 +-
.../flink/api/scala/batch/sql/SortITCase.scala | 32 +-
.../scala/batch/sql/TableWithSQLITCase.scala | 3 +-
.../scala/batch/table/AggregationsITCase.scala | 3 +-
.../api/scala/batch/table/CalcITCase.scala | 3 +-
.../api/scala/batch/table/JoinITCase.scala | 3 +-
.../scala/batch/table/SetOperatorsITCase.scala | 3 +-
.../api/scala/batch/table/SortITCase.scala | 49 +-
.../table/UserDefinedTableFunctionTest.scala | 7 +-
.../api/scala/stream/TableSourceITCase.scala | 7 +-
.../flink/api/scala/stream/sql/SqlITCase.scala | 3 +-
.../scala/stream/table/AggregationsITCase.scala | 3 +-
.../api/scala/stream/table/CalcITCase.scala | 3 +-
.../api/scala/stream/table/UnionITCase.scala | 3 +-
.../table/UserDefinedTableFunctionTest.scala | 8 +-
.../api/scala/stream/utils/StreamITCase.scala | 2 +-
.../api/table/expressions/ArrayTypeTest.scala | 9 +-
.../table/expressions/CompositeAccessTest.scala | 9 +-
.../api/table/expressions/DecimalTypeTest.scala | 9 +-
.../expressions/NonDeterministicTests.scala | 6 +-
.../table/expressions/ScalarFunctionsTest.scala | 9 +-
.../table/expressions/ScalarOperatorsTest.scala | 9 +-
.../table/expressions/SqlExpressionTest.scala | 6 +-
.../table/expressions/TemporalTypesTest.scala | 9 +-
.../UserDefinedScalarFunctionTest.scala | 9 +-
.../expressions/utils/ExpressionTestBase.scala | 7 +-
.../runtime/aggregate/AggregateTestBase.scala | 2 +-
.../dataset/DataSetCorrelateITCase.scala | 3 +-
.../datastream/DataStreamCorrelateITCase.scala | 3 +-
.../table/utils/UserDefinedTableFunctions.scala | 8 +-
111 files changed, 1457 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index ddf1ad3..920d718 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 732440b..127dafc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index b155576..839388f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 63bb57e..6c7d727 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8f51237..0e3791c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 6d0b140..0ac452e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index a2d66ac..f9ef2ce 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 38ea47c..edbebd0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 975ef58..dfcba5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 03b5040..f423003 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 45f70ac..df84a0f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index 4a75f50..10b9acc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ee98783..27c4de7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 714d9cd..6c42943 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index fd423d7..498e918 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index 4344810..b4b3341 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 077ff13..1998aa6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.util.serialization;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -49,15 +49,15 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
@Override
public byte[] serialize(Row row) {
- if (row.productArity() != fieldNames.length) {
+ if (row.getArity() != fieldNames.length) {
throw new IllegalStateException(String.format(
"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
}
ObjectNode objectNode = mapper.createObjectNode();
- for (int i = 0; i < row.productArity(); i++) {
- JsonNode node = mapper.valueToTree(row.productElement(i));
+ for (int i = 0; i < row.getArity(); i++) {
+ JsonNode node = mapper.valueToTree(row.getField(i));
objectNode.set(fieldNames[i], node);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index 68225e2..88f62f0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.junit.Test;
@@ -61,10 +61,10 @@ public class JsonRowDeserializationSchemaTest {
Row deserialized = deserializationSchema.deserialize(serializedJson);
- assertEquals(3, deserialized.productArity());
- assertEquals(id, deserialized.productElement(0));
- assertEquals(name, deserialized.productElement(1));
- assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
+ assertEquals(3, deserialized.getArity());
+ assertEquals(id, deserialized.getField(0));
+ assertEquals(name, deserialized.getField(1));
+ assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
}
/**
@@ -85,8 +85,8 @@ public class JsonRowDeserializationSchemaTest {
Row row = deserializationSchema.deserialize(serializedJson);
- assertEquals(1, row.productArity());
- assertNull("Missing field not null", row.productElement(0));
+ assertEquals(1, row.getArity());
+ assertNull("Missing field not null", row.getField(0));
deserializationSchema.setFailOnMissingField(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 92af15d..78dedf4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.junit.Test;
@@ -88,10 +88,10 @@ public class JsonRowSerializationSchemaTest {
private void assertEqualRows(Row expectedRow, Row resultRow) {
assertEquals("Deserialized row should have expected number of fields",
- expectedRow.productArity(), resultRow.productArity());
- for (int i = 0; i < expectedRow.productArity(); i++) {
+ expectedRow.getArity(), resultRow.getArity());
+ for (int i = 0; i < expectedRow.getArity(); i++) {
assertEquals(String.format("Field number %d should be as in the original row", i),
- expectedRow.productElement(i), resultRow.productElement(i));
+ expectedRow.getField(i), resultRow.getField(i));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index ae0af52..cc1c166 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 2a281e8..ad51993 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b4246f5..3153f96 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -37,8 +37,8 @@ import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
@@ -276,7 +276,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
if (!hasNext) {
return null;
}
- for (int pos = 0; pos < row.productArity(); pos++) {
+ for (int pos = 0; pos < row.getArity(); pos++) {
row.setField(pos, resultSet.getObject(pos + 1));
}
//update hasNext after we've read the record
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index da4b1ad..c5585e2 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,22 +108,22 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
@Override
public void writeRecord(Row row) throws IOException {
- if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
}
try {
if (typesArray == null ) {
// no types provided
- for (int index = 0; index < row.productArity(); index++) {
- LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
- upload.setObject(index + 1, row.productElement(index));
+ for (int index = 0; index < row.getArity(); index++) {
+ LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.getField(index));
+ upload.setObject(index + 1, row.getField(index));
}
} else {
// types provided
- for (int index = 0; index < row.productArity(); index++) {
+ for (int index = 0; index < row.getArity(); index++) {
- if (row.productElement(index) == null) {
+ if (row.getField(index) == null) {
upload.setNull(index + 1, typesArray[index]);
} else {
// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
@@ -133,56 +133,56 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
break;
case java.sql.Types.BOOLEAN:
case java.sql.Types.BIT:
- upload.setBoolean(index + 1, (boolean) row.productElement(index));
+ upload.setBoolean(index + 1, (boolean) row.getField(index));
break;
case java.sql.Types.CHAR:
case java.sql.Types.NCHAR:
case java.sql.Types.VARCHAR:
case java.sql.Types.LONGVARCHAR:
case java.sql.Types.LONGNVARCHAR:
- upload.setString(index + 1, (String) row.productElement(index));
+ upload.setString(index + 1, (String) row.getField(index));
break;
case java.sql.Types.TINYINT:
- upload.setByte(index + 1, (byte) row.productElement(index));
+ upload.setByte(index + 1, (byte) row.getField(index));
break;
case java.sql.Types.SMALLINT:
- upload.setShort(index + 1, (short) row.productElement(index));
+ upload.setShort(index + 1, (short) row.getField(index));
break;
case java.sql.Types.INTEGER:
- upload.setInt(index + 1, (int) row.productElement(index));
+ upload.setInt(index + 1, (int) row.getField(index));
break;
case java.sql.Types.BIGINT:
- upload.setLong(index + 1, (long) row.productElement(index));
+ upload.setLong(index + 1, (long) row.getField(index));
break;
case java.sql.Types.REAL:
- upload.setFloat(index + 1, (float) row.productElement(index));
+ upload.setFloat(index + 1, (float) row.getField(index));
break;
case java.sql.Types.FLOAT:
case java.sql.Types.DOUBLE:
- upload.setDouble(index + 1, (double) row.productElement(index));
+ upload.setDouble(index + 1, (double) row.getField(index));
break;
case java.sql.Types.DECIMAL:
case java.sql.Types.NUMERIC:
- upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
+ upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
break;
case java.sql.Types.DATE:
- upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+ upload.setDate(index + 1, (java.sql.Date) row.getField(index));
break;
case java.sql.Types.TIME:
- upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+ upload.setTime(index + 1, (java.sql.Time) row.getField(index));
break;
case java.sql.Types.TIMESTAMP:
- upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+ upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
break;
case java.sql.Types.BINARY:
case java.sql.Types.VARBINARY:
case java.sql.Types.LONGVARBINARY:
- upload.setBytes(index + 1, (byte[]) row.productElement(index));
+ upload.setBytes(index + 1, (byte[]) row.getField(index));
break;
default:
- upload.setObject(index + 1, row.productElement(index));
+ upload.setObject(index + 1, row.getField(index));
LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
- typesArray[index], index + 1, row.productElement(index));
+ typesArray[index], index + 1, row.getField(index));
// case java.sql.Types.SQLXML
// case java.sql.Types.ARRAY:
// case java.sql.Types.JAVA_OBJECT:
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index da9469b..88aa4fa 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index efae076..b08aa3a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -25,7 +25,7 @@ import java.sql.ResultSet;
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.apache.flink.core.io.InputSplit;
import org.junit.After;
import org.junit.Assert;
@@ -116,15 +116,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
break;
}
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+ if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+ if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+ if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+ if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+ if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
for (int x = 0; x < 5; x++) {
if(testData[recordCount][x]!=null) {
- Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+ Assert.assertEquals(testData[recordCount][x], next.getField(x));
}
}
recordCount++;
@@ -162,15 +162,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
if (next == null) {
break;
}
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+ if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+ if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+ if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+ if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+ if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
for (int x = 0; x < 5; x++) {
if(testData[recordCount][x]!=null) {
- Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+ Assert.assertEquals(testData[recordCount][x], next.getField(x));
}
}
recordCount++;
@@ -208,11 +208,11 @@ public class JDBCInputFormatTest extends JDBCTestBase {
if (next == null) {
break;
}
- if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
- if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
- if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
- if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
- if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+ if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+ if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+ if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+ if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+ if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
recordCount++;
}
@@ -244,4 +244,4 @@ public class JDBCInputFormatTest extends JDBCTestBase {
Assert.assertEquals(0, recordsCnt);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 086a84c..8de0c34 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -26,7 +26,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -137,25 +137,25 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
for (int i = 0; i < tuple5.getArity(); i++) {
row.setField(i, resultSet.getObject(i + 1));
}
- if (row.productElement(0) != null) {
- Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
+ if (row.getField(0) != null) {
+ Assert.assertEquals("Field 0 should be int", Integer.class, row.getField(0).getClass());
}
- if (row.productElement(1) != null) {
- Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
+ if (row.getField(1) != null) {
+ Assert.assertEquals("Field 1 should be String", String.class, row.getField(1).getClass());
}
- if (row.productElement(2) != null) {
- Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
+ if (row.getField(2) != null) {
+ Assert.assertEquals("Field 2 should be String", String.class, row.getField(2).getClass());
}
- if (row.productElement(3) != null) {
- Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
+ if (row.getField(3) != null) {
+ Assert.assertEquals("Field 3 should be float", Double.class, row.getField(3).getClass());
}
- if (row.productElement(4) != null) {
- Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
+ if (row.getField(4) != null) {
+ Assert.assertEquals("Field 4 should be int", Integer.class, row.getField(4).getClass());
}
for (int x = 0; x < tuple5.getArity(); x++) {
if (JDBCTestBase.testData[recordCount][x] != null) {
- Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
+ Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 69ad693..ffcb26f 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -25,7 +25,7 @@ import java.sql.Statement;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
new file mode 100644
index 0000000..34233f5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+
+@PublicEvolving
+public class RowCsvInputFormat extends CsvInputFormat<Row> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int arity;
+ private boolean emptyColumnAsNull;
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean emptyColumnAsNull) {
+ super(filePath);
+ if (rowTypeInfo.getArity() == 0) {
+ throw new IllegalArgumentException("Row arity must be greater than 0.");
+ }
+ this.arity = rowTypeInfo.getArity();
+
+ boolean[] fieldsMask;
+ if (includedFieldsMask != null) {
+ fieldsMask = includedFieldsMask;
+ } else {
+ fieldsMask = createDefaultMask(arity);
+ }
+ this.emptyColumnAsNull = emptyColumnAsNull;
+ setDelimiter(lineDelimiter);
+ setFieldDelimiter(fieldDelimiter);
+ setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo));
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) {
+ this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, toBoolMask(includedFieldsMask), false);
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter) {
+ this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, null, false);
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+ this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask, false);
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
+ this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask);
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean emptyColumnAsNull) {
+ this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull);
+ }
+
+ public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+ this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null);
+ }
+
+ private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) {
+ Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
+ for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+ classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+ }
+ return classes;
+ }
+
+ private static boolean[] toBoolMask(int[] includedFieldsMask) {
+ if (includedFieldsMask == null) {
+ return null;
+ } else {
+ return toBooleanMask(includedFieldsMask);
+ }
+ }
+
+ @Override
+ protected Row fillRecord(Row reuse, Object[] parsedValues) {
+ Row reuseRow;
+ if (reuse == null) {
+ reuseRow = new Row(arity);
+ } else {
+ reuseRow = reuse;
+ }
+ for (int i = 0; i < parsedValues.length; i++) {
+ reuseRow.setField(i, parsedValues[i]);
+ }
+ return reuseRow;
+ }
+
+ @Override
+ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
+ byte[] fieldDelimiter = this.getFieldDelimiter();
+ boolean[] fieldIncluded = this.fieldIncluded;
+
+ int startPos = offset;
+ int limit = offset + numBytes;
+
+ int field = 0;
+ int output = 0;
+ while (field < fieldIncluded.length) {
+
+ // check valid start position
+ if (startPos >= limit) {
+ if (isLenient()) {
+ return false;
+ } else {
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ }
+ }
+
+ if (fieldIncluded[field]) {
+ // parse field
+ FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
+ int latestValidPos = startPos;
+ startPos = parser.resetErrorStateAndParse(
+ bytes,
+ startPos,
+ limit,
+ fieldDelimiter,
+ holders[output]);
+
+ if (!isLenient() && (parser.getErrorState() != FieldParser.ParseErrorState.NONE)) {
+ // the error state EMPTY_COLUMN is ignored
+ if (parser.getErrorState() != FieldParser.ParseErrorState.EMPTY_COLUMN) {
+ throw new ParseException(String.format("Parsing error for column %1$s of row '%2$s' originated by %3$s: %4$s.",
+ field, new String(bytes, offset, numBytes), parser.getClass().getSimpleName(), parser.getErrorState()));
+ }
+ }
+ holders[output] = parser.getLastResult();
+
+ // check parse result:
+ // the result is null if it is invalid
+ // or empty with emptyColumnAsNull enabled
+ if (startPos < 0 ||
+ (emptyColumnAsNull && (parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) {
+ holders[output] = null;
+ startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter);
+ }
+ output++;
+ } else {
+ // skip field
+ startPos = skipFields(bytes, startPos, limit, fieldDelimiter);
+ }
+
+ // check if something went wrong
+ if (startPos < 0) {
+ throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'",
+ field, new String(bytes, offset, numBytes)));
+ }
+
+ field++;
+ }
+ return true;
+ }
+}