You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:53 UTC

[3/7] flink git commit: [FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.

[FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.

- Port RowCsvInputFormat to Java and move it to flink-core.

This closes #3003.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d27f8f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d27f8f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d27f8f2

Branch: refs/heads/master
Commit: 4d27f8f2deef9fad845ebc91cef121cf9b35f825
Parents: a9e6ec8
Author: tonycox <to...@gmail.com>
Authored: Fri Dec 9 21:41:36 2016 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100

----------------------------------------------------------------------
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../connectors/kafka/Kafka08JsonTableSink.java  |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka08JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/Kafka09JsonTableSink.java  |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka09JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/KafkaJsonTableSink.java    |   2 +-
 .../connectors/kafka/KafkaTableSink.java        |   4 +-
 .../connectors/kafka/KafkaTableSource.java      |   4 +-
 .../JsonRowDeserializationSchema.java           |   4 +-
 .../JsonRowSerializationSchema.java             |   8 +-
 .../kafka/JsonRowDeserializationSchemaTest.java |  14 +-
 .../kafka/JsonRowSerializationSchemaTest.java   |   8 +-
 .../kafka/KafkaTableSinkTestBase.java           |   4 +-
 .../kafka/KafkaTableSourceTestBase.java         |   2 +-
 .../flink/api/java/io/jdbc/JDBCInputFormat.java |   6 +-
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  44 +-
 .../flink/api/java/io/jdbc/JDBCFullTest.java    |   2 +-
 .../api/java/io/jdbc/JDBCInputFormatTest.java   |  38 +-
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  24 +-
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |   2 +-
 .../flink/api/java/io/RowCsvInputFormat.java    | 174 ++++
 .../api/java/io/RowCsvInputFormatTest.java      | 879 +++++++++++++++++++
 .../api/java/table/BatchTableEnvironment.scala  |   4 +-
 .../api/java/table/StreamTableEnvironment.scala |   4 +-
 .../api/scala/table/BatchTableEnvironment.scala |   2 +-
 .../scala/table/StreamTableEnvironment.scala    |   2 +-
 .../apache/flink/api/scala/table/package.scala  |   3 +-
 .../flink/api/table/BatchTableEnvironment.scala |   3 +-
 .../api/table/StreamTableEnvironment.scala      |   1 +
 .../flink/api/table/TableEnvironment.scala      |   7 +-
 .../flink/api/table/codegen/CodeGenUtils.scala  |   3 +-
 .../flink/api/table/codegen/CodeGenerator.scala |   5 +-
 .../api/table/codegen/ExpressionReducer.scala   |  10 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   8 +-
 .../nodes/datastream/DataStreamAggregate.scala  |   8 +-
 .../DataSetAggregateWithNullValuesRule.scala    |   1 +
 .../table/plan/schema/TableSourceTable.scala    |   6 +-
 .../api/table/runtime/aggregate/Aggregate.scala |   2 +-
 .../AggregateAllTimeWindowFunction.scala        |   2 +-
 .../aggregate/AggregateAllWindowFunction.scala  |   2 +-
 .../aggregate/AggregateMapFunction.scala        |   6 +-
 .../AggregateReduceCombineFunction.scala        |   4 +-
 .../AggregateReduceGroupFunction.scala          |   4 +-
 .../aggregate/AggregateTimeWindowFunction.scala |   2 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   7 +-
 .../aggregate/AggregateWindowFunction.scala     |   2 +-
 .../table/runtime/aggregate/AvgAggregate.scala  |  62 +-
 .../runtime/aggregate/CountAggregate.scala      |   8 +-
 ...rementalAggregateAllTimeWindowFunction.scala |   2 +-
 .../IncrementalAggregateAllWindowFunction.scala |   4 +-
 .../IncrementalAggregateReduceFunction.scala    |   4 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   2 +-
 .../IncrementalAggregateWindowFunction.scala    |   4 +-
 .../table/runtime/aggregate/MaxAggregate.scala  |  14 +-
 .../table/runtime/aggregate/MinAggregate.scala  |  14 +-
 .../table/runtime/aggregate/SumAggregate.scala  |  14 +-
 .../aggregate/TimeWindowPropertyCollector.scala |   4 +-
 .../flink/api/table/sinks/CsvTableSink.scala    |  12 +-
 .../api/table/sources/CsvTableSource.scala      |   9 +-
 .../api/table/typeutils/TypeConverter.scala     |  11 +-
 .../api/java/batch/TableEnvironmentITCase.java  |   2 +-
 .../flink/api/java/batch/TableSourceITCase.java |   4 +-
 .../flink/api/java/batch/sql/SqlITCase.java     |   2 +-
 .../java/batch/table/AggregationsITCase.java    |   2 +-
 .../flink/api/java/batch/table/CalcITCase.java  |   2 +-
 .../api/java/batch/table/CastingITCase.java     |   2 +-
 .../flink/api/java/batch/table/JoinITCase.java  |   2 +-
 .../flink/api/java/stream/sql/SqlITCase.java    |   2 +-
 .../batch/ProjectableTableSourceITCase.scala    |   7 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   3 +-
 .../api/scala/batch/TableSourceITCase.scala     |   9 +-
 .../scala/batch/sql/AggregationsITCase.scala    |   3 +-
 .../flink/api/scala/batch/sql/CalcITCase.scala  |   3 +-
 .../flink/api/scala/batch/sql/JoinITCase.scala  |   3 +-
 .../scala/batch/sql/SetOperatorsITCase.scala    |   3 +-
 .../flink/api/scala/batch/sql/SortITCase.scala  |  32 +-
 .../scala/batch/sql/TableWithSQLITCase.scala    |   3 +-
 .../scala/batch/table/AggregationsITCase.scala  |   3 +-
 .../api/scala/batch/table/CalcITCase.scala      |   3 +-
 .../api/scala/batch/table/JoinITCase.scala      |   3 +-
 .../scala/batch/table/SetOperatorsITCase.scala  |   3 +-
 .../api/scala/batch/table/SortITCase.scala      |  49 +-
 .../table/UserDefinedTableFunctionTest.scala    |   7 +-
 .../api/scala/stream/TableSourceITCase.scala    |   7 +-
 .../flink/api/scala/stream/sql/SqlITCase.scala  |   3 +-
 .../scala/stream/table/AggregationsITCase.scala |   3 +-
 .../api/scala/stream/table/CalcITCase.scala     |   3 +-
 .../api/scala/stream/table/UnionITCase.scala    |   3 +-
 .../table/UserDefinedTableFunctionTest.scala    |   8 +-
 .../api/scala/stream/utils/StreamITCase.scala   |   2 +-
 .../api/table/expressions/ArrayTypeTest.scala   |   9 +-
 .../table/expressions/CompositeAccessTest.scala |   9 +-
 .../api/table/expressions/DecimalTypeTest.scala |   9 +-
 .../expressions/NonDeterministicTests.scala     |   6 +-
 .../table/expressions/ScalarFunctionsTest.scala |   9 +-
 .../table/expressions/ScalarOperatorsTest.scala |   9 +-
 .../table/expressions/SqlExpressionTest.scala   |   6 +-
 .../table/expressions/TemporalTypesTest.scala   |   9 +-
 .../UserDefinedScalarFunctionTest.scala         |   9 +-
 .../expressions/utils/ExpressionTestBase.scala  |   7 +-
 .../runtime/aggregate/AggregateTestBase.scala   |   2 +-
 .../dataset/DataSetCorrelateITCase.scala        |   3 +-
 .../datastream/DataStreamCorrelateITCase.scala  |   3 +-
 .../table/utils/UserDefinedTableFunctions.scala |   8 +-
 111 files changed, 1457 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index ddf1ad3..920d718 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 732440b..127dafc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index b155576..839388f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 63bb57e..6c7d727 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8f51237..0e3791c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 6d0b140..0ac452e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index a2d66ac..f9ef2ce 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 38ea47c..edbebd0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 975ef58..dfcba5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 03b5040..f423003 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 45f70ac..df84a0f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index 4a75f50..10b9acc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ee98783..27c4de7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 714d9cd..6c42943 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index fd423d7..498e918 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index 4344810..b4b3341 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 077ff13..1998aa6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.util.serialization;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 
@@ -49,15 +49,15 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 
 	@Override
 	public byte[] serialize(Row row) {
-		if (row.productArity() != fieldNames.length) {
+		if (row.getArity() != fieldNames.length) {
 			throw new IllegalStateException(String.format(
 				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
 		}
 
 		ObjectNode objectNode = mapper.createObjectNode();
 
-		for (int i = 0; i < row.productArity(); i++) {
-			JsonNode node = mapper.valueToTree(row.productElement(i));
+		for (int i = 0; i < row.getArity(); i++) {
+			JsonNode node = mapper.valueToTree(row.getField(i));
 			objectNode.set(fieldNames[i], node);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index 68225e2..88f62f0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.junit.Test;
 
@@ -61,10 +61,10 @@ public class JsonRowDeserializationSchemaTest {
 
 		Row deserialized = deserializationSchema.deserialize(serializedJson);
 
-		assertEquals(3, deserialized.productArity());
-		assertEquals(id, deserialized.productElement(0));
-		assertEquals(name, deserialized.productElement(1));
-		assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
+		assertEquals(3, deserialized.getArity());
+		assertEquals(id, deserialized.getField(0));
+		assertEquals(name, deserialized.getField(1));
+		assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
 	}
 
 	/**
@@ -85,8 +85,8 @@ public class JsonRowDeserializationSchemaTest {
 
 		Row row = deserializationSchema.deserialize(serializedJson);
 
-		assertEquals(1, row.productArity());
-		assertNull("Missing field not null", row.productElement(0));
+		assertEquals(1, row.getArity());
+		assertNull("Missing field not null", row.getField(0));
 
 		deserializationSchema.setFailOnMissingField(true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 92af15d..78dedf4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
@@ -88,10 +88,10 @@ public class JsonRowSerializationSchemaTest {
 
 	private void assertEqualRows(Row expectedRow, Row resultRow) {
 		assertEquals("Deserialized row should have expected number of fields",
-			expectedRow.productArity(), resultRow.productArity());
-		for (int i = 0; i < expectedRow.productArity(); i++) {
+			expectedRow.getArity(), resultRow.getArity());
+		for (int i = 0; i < expectedRow.getArity(); i++) {
 			assertEquals(String.format("Field number %d should be as in the original row", i),
-				expectedRow.productElement(i), resultRow.productElement(i));
+				expectedRow.getField(i), resultRow.getField(i));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index ae0af52..cc1c166 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 2a281e8..ad51993 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b4246f5..3153f96 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -37,8 +37,8 @@ import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
@@ -276,7 +276,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements
 			if (!hasNext) {
 				return null;
 			}
-			for (int pos = 0; pos < row.productArity(); pos++) {
+			for (int pos = 0; pos < row.getArity(); pos++) {
 				row.setField(pos, resultSet.getObject(pos + 1));
 			}
 			//update hasNext after we've read the record

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index da4b1ad..c5585e2 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
 
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,22 +108,22 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	@Override
 	public void writeRecord(Row row) throws IOException {
 
-		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
+		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
 			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
 		} 
 		try {
 
 			if (typesArray == null ) {
 				// no types provided
-				for (int index = 0; index < row.productArity(); index++) {
-					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
-					upload.setObject(index + 1, row.productElement(index));
+				for (int index = 0; index < row.getArity(); index++) {
+					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.getField(index));
+					upload.setObject(index + 1, row.getField(index));
 				}
 			} else {
 				// types provided
-				for (int index = 0; index < row.productArity(); index++) {
+				for (int index = 0; index < row.getArity(); index++) {
 
-					if (row.productElement(index) == null) {
+					if (row.getField(index) == null) {
 						upload.setNull(index + 1, typesArray[index]);
 					} else {
 						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
@@ -133,56 +133,56 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 								break;
 							case java.sql.Types.BOOLEAN:
 							case java.sql.Types.BIT:
-								upload.setBoolean(index + 1, (boolean) row.productElement(index));
+								upload.setBoolean(index + 1, (boolean) row.getField(index));
 								break;
 							case java.sql.Types.CHAR:
 							case java.sql.Types.NCHAR:
 							case java.sql.Types.VARCHAR:
 							case java.sql.Types.LONGVARCHAR:
 							case java.sql.Types.LONGNVARCHAR:
-								upload.setString(index + 1, (String) row.productElement(index));
+								upload.setString(index + 1, (String) row.getField(index));
 								break;
 							case java.sql.Types.TINYINT:
-								upload.setByte(index + 1, (byte) row.productElement(index));
+								upload.setByte(index + 1, (byte) row.getField(index));
 								break;
 							case java.sql.Types.SMALLINT:
-								upload.setShort(index + 1, (short) row.productElement(index));
+								upload.setShort(index + 1, (short) row.getField(index));
 								break;
 							case java.sql.Types.INTEGER:
-								upload.setInt(index + 1, (int) row.productElement(index));
+								upload.setInt(index + 1, (int) row.getField(index));
 								break;
 							case java.sql.Types.BIGINT:
-								upload.setLong(index + 1, (long) row.productElement(index));
+								upload.setLong(index + 1, (long) row.getField(index));
 								break;
 							case java.sql.Types.REAL:
-								upload.setFloat(index + 1, (float) row.productElement(index));
+								upload.setFloat(index + 1, (float) row.getField(index));
 								break;
 							case java.sql.Types.FLOAT:
 							case java.sql.Types.DOUBLE:
-								upload.setDouble(index + 1, (double) row.productElement(index));
+								upload.setDouble(index + 1, (double) row.getField(index));
 								break;
 							case java.sql.Types.DECIMAL:
 							case java.sql.Types.NUMERIC:
-								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
+								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
 								break;
 							case java.sql.Types.DATE:
-								upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+								upload.setDate(index + 1, (java.sql.Date) row.getField(index));
 								break;
 							case java.sql.Types.TIME:
-								upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+								upload.setTime(index + 1, (java.sql.Time) row.getField(index));
 								break;
 							case java.sql.Types.TIMESTAMP:
-								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
 								break;
 							case java.sql.Types.BINARY:
 							case java.sql.Types.VARBINARY:
 							case java.sql.Types.LONGVARBINARY:
-								upload.setBytes(index + 1, (byte[]) row.productElement(index));
+								upload.setBytes(index + 1, (byte[]) row.getField(index));
 								break;
 							default:
-								upload.setObject(index + 1, row.productElement(index));
+								upload.setObject(index + 1, row.getField(index));
 								LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
-									typesArray[index], index + 1, row.productElement(index));
+									typesArray[index], index + 1, row.getField(index));
 								// case java.sql.Types.SQLXML
 								// case java.sql.Types.ARRAY:
 								// case java.sql.Types.JAVA_OBJECT:

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index da9469b..88aa4fa 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
 import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index efae076..b08aa3a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -25,7 +25,7 @@ import java.sql.ResultSet;
 import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
 import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.core.io.InputSplit;
 import org.junit.After;
 import org.junit.Assert;
@@ -116,15 +116,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				break;
 			}
 			
-			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+			if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+			if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+			if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+			if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+			if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
 
 			for (int x = 0; x < 5; x++) {
 				if(testData[recordCount][x]!=null) {
-					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+					Assert.assertEquals(testData[recordCount][x], next.getField(x));
 				}
 			}
 			recordCount++;
@@ -162,15 +162,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				if (next == null) {
 					break;
 				}
-				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+				if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+				if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+				if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+				if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+				if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
 
 				for (int x = 0; x < 5; x++) {
 					if(testData[recordCount][x]!=null) {
-						Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+						Assert.assertEquals(testData[recordCount][x], next.getField(x));
 					}
 				}
 				recordCount++;
@@ -208,11 +208,11 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				if (next == null) {
 					break;
 				}
-				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
-				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
-				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
-				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
-				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+				if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
+				if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
+				if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
+				if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
+				if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
 
 				recordCount++;
 			}
@@ -244,4 +244,4 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		Assert.assertEquals(0, recordsCnt);
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 086a84c..8de0c34 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -26,7 +26,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -137,25 +137,25 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 				for (int i = 0; i < tuple5.getArity(); i++) {
 					row.setField(i, resultSet.getObject(i + 1));
 				}
-				if (row.productElement(0) != null) {
-					Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
+				if (row.getField(0) != null) {
+					Assert.assertEquals("Field 0 should be int", Integer.class, row.getField(0).getClass());
 				}
-				if (row.productElement(1) != null) {
-					Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
+				if (row.getField(1) != null) {
+					Assert.assertEquals("Field 1 should be String", String.class, row.getField(1).getClass());
 				}
-				if (row.productElement(2) != null) {
-					Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
+				if (row.getField(2) != null) {
+					Assert.assertEquals("Field 2 should be String", String.class, row.getField(2).getClass());
 				}
-				if (row.productElement(3) != null) {
-					Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
+				if (row.getField(3) != null) {
+					Assert.assertEquals("Field 3 should be float", Double.class, row.getField(3).getClass());
 				}
-				if (row.productElement(4) != null) {
-					Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
+				if (row.getField(4) != null) {
+					Assert.assertEquals("Field 4 should be int", Integer.class, row.getField(4).getClass());
 				}
 
 				for (int x = 0; x < tuple5.getArity(); x++) {
 					if (JDBCTestBase.testData[recordCount][x] != null) {
-						Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
+						Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x));
 					}
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 69ad693..ffcb26f 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -25,7 +25,7 @@ import java.sql.Statement;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
new file mode 100644
index 0000000..34233f5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+
+@PublicEvolving
+public class RowCsvInputFormat extends CsvInputFormat<Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	private int arity;
+	private boolean emptyColumnAsNull;
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean emptyColumnAsNull) {
+		super(filePath);
+		if (rowTypeInfo.getArity() == 0) {
+			throw new IllegalArgumentException("Row arity must be greater than 0.");
+		}
+		this.arity = rowTypeInfo.getArity();
+
+		boolean[] fieldsMask;
+		if (includedFieldsMask != null) {
+			fieldsMask = includedFieldsMask;
+		} else {
+			fieldsMask = createDefaultMask(arity);
+		}
+		this.emptyColumnAsNull = emptyColumnAsNull;
+		setDelimiter(lineDelimiter);
+		setFieldDelimiter(fieldDelimiter);
+		setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo));
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) {
+		this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, toBoolMask(includedFieldsMask), false);
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter) {
+		this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, null, false);
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask, false);
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
+		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask);
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean emptyColumnAsNull) {
+		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull);
+	}
+
+	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+		this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null);
+	}
+
+	private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) {
+		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
+		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+		}
+		return classes;
+	}
+
+	private static boolean[] toBoolMask(int[] includedFieldsMask) {
+		if (includedFieldsMask == null) {
+			return null;
+		} else {
+			return toBooleanMask(includedFieldsMask);
+		}
+	}
+
+	@Override
+	protected Row fillRecord(Row reuse, Object[] parsedValues) {
+		Row reuseRow;
+		if (reuse == null) {
+			reuseRow = new Row(arity);
+		} else {
+			reuseRow = reuse;
+		}
+		for (int i = 0; i < parsedValues.length; i++) {
+			reuseRow.setField(i, parsedValues[i]);
+		}
+		return reuseRow;
+	}
+
+	@Override
+	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
+		byte[] fieldDelimiter = this.getFieldDelimiter();
+		boolean[] fieldIncluded = this.fieldIncluded;
+
+		int startPos = offset;
+		int limit = offset + numBytes;
+
+		int field = 0;
+		int output = 0;
+		while (field < fieldIncluded.length) {
+
+			// check valid start position
+			if (startPos >= limit) {
+				if (isLenient()) {
+					return false;
+				} else {
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+				}
+			}
+
+			if (fieldIncluded[field]) {
+				// parse field
+				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
+				int latestValidPos = startPos;
+				startPos = parser.resetErrorStateAndParse(
+					bytes,
+					startPos,
+					limit,
+					fieldDelimiter,
+					holders[output]);
+
+				if (!isLenient() && (parser.getErrorState() != FieldParser.ParseErrorState.NONE)) {
+					// the error state EMPTY_COLUMN is ignored
+					if (parser.getErrorState() != FieldParser.ParseErrorState.EMPTY_COLUMN) {
+						throw new ParseException(String.format("Parsing error for column %1$s of row '%2$s' originated by %3$s: %4$s.",
+							field, new String(bytes, offset, numBytes), parser.getClass().getSimpleName(), parser.getErrorState()));
+					}
+				}
+				holders[output] = parser.getLastResult();
+
+				// check parse result:
+				// the result is null if it is invalid
+				// or empty with emptyColumnAsNull enabled
+				if (startPos < 0 ||
+					(emptyColumnAsNull && (parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) {
+					holders[output] = null;
+					startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter);
+				}
+				output++;
+			} else {
+				// skip field
+				startPos = skipFields(bytes, startPos, limit, fieldDelimiter);
+			}
+
+			// check if something went wrong
+			if (startPos < 0) {
+				throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'",
+					field, new String(bytes, offset, numBytes)));
+			}
+
+			field++;
+		}
+		return true;
+	}
+}