You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:28 UTC

[05/21] flink git commit: [FLINK-7972] [core] Move SerializationSchema to 'flink-core'

[FLINK-7972] [core] Move SerializationSchema to 'flink-core'

Moves the SerializationSchema and its related from
flink-streaming-java to flink-core.

That helps API level projects that depend on those classes
to not pull in a dependency on runtime classes, and to
not be Scala version dependent.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe931d07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe931d07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe931d07

Branch: refs/heads/master
Commit: fe931d075f031e9494fd26dbeed4bb1024bd52cf
Parents: 16b0882
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 2 18:07:25 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 3 16:40:34 2017 +0100

----------------------------------------------------------------------
 docs/quickstart/run_example_quickstart.md       |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer010.java |   2 +-
 .../connectors/kafka/FlinkKafkaProducer010.java |   2 +-
 .../kafka/Kafka010AvroTableSource.java          |   2 +-
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka010AvroTableSourceTest.java      |   4 +-
 .../connectors/kafka/Kafka010ITCase.java        |   2 +-
 .../kafka/Kafka010JsonTableSourceTest.java      |   2 +-
 .../kafka/internal/Kafka010FetcherTest.java     |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer011.java |   2 +-
 .../connectors/kafka/FlinkKafkaProducer011.java |   3 +-
 .../kafka/Kafka011AvroTableSource.java          |   2 +-
 .../kafka/Kafka011JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/FlinkKafkaProducer011ITCase.java      |   2 +-
 .../kafka/Kafka011AvroTableSourceTest.java      |   4 +-
 .../connectors/kafka/Kafka011ITCase.java        |   2 +-
 .../kafka/Kafka011JsonTableSourceTest.java      |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer08.java  |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer081.java |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer082.java |   2 +-
 .../connectors/kafka/FlinkKafkaProducer.java    |   2 +-
 .../connectors/kafka/FlinkKafkaProducer08.java  |   2 +-
 .../kafka/Kafka08AvroTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08JsonTableSink.java  |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka08AvroTableSourceTest.java       |   4 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka08JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/KafkaConsumer08Test.java   |   4 +-
 .../connectors/kafka/KafkaProducerTest.java     |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   2 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |   2 +-
 .../kafka/Kafka09AvroTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09JsonTableSink.java  |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../kafka/Kafka09AvroTableSourceTest.java       |   4 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka09JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/KafkaProducerTest.java     |   2 +-
 .../kafka/internal/Kafka09FetcherTest.java      |   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   6 +-
 .../connectors/kafka/KafkaJsonTableSink.java    |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSink.java        |   2 +-
 .../connectors/kafka/KafkaTableSource.java      |   2 +-
 .../AvroRowDeserializationSchema.java           |   1 +
 .../AvroRowSerializationSchema.java             |   1 +
 .../JSONDeserializationSchema.java              |   2 +
 .../JsonRowDeserializationSchema.java           |   1 +
 .../JsonRowSerializationSchema.java             |   1 +
 .../KeyedDeserializationSchemaWrapper.java      |   1 +
 .../KeyedSerializationSchemaWrapper.java        |   2 +
 .../kafka/FlinkKafkaProducerBaseTest.java       |   2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |   6 +-
 .../connectors/kafka/KafkaProducerTestBase.java |   4 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   2 +-
 .../kafka/KafkaTableSinkTestBase.java           |   2 +-
 .../kafka/KafkaTableSourceTestBase.java         |   2 +-
 .../connectors/kafka/KafkaTestEnvironment.java  |   3 +-
 .../kafka/testutils/DataGenerators.java         |   4 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   2 +-
 .../connectors/rabbitmq/RMQSource.java          |   2 +-
 .../connectors/rabbitmq/RMQSinkTest.java        |   2 +-
 .../connectors/rabbitmq/RMQSourceTest.java      |   2 +-
 flink-core/pom.xml                              |  15 +++
 .../AbstractDeserializationSchema.java          |  70 ++++++++++
 .../serialization/DeserializationSchema.java    |  56 ++++++++
 .../serialization/SerializationSchema.java      |  42 ++++++
 .../serialization/SimpleStringSchema.java       | 107 +++++++++++++++
 .../TypeInformationSerializationSchema.java     | 131 +++++++++++++++++++
 .../AbstractDeserializationSchemaTest.java      | 119 +++++++++++++++++
 .../serialization/SimpleStringSchemaTest.java   |  53 ++++++++
 .../TypeInformationSerializationSchemaTest.java | 121 +++++++++++++++++
 .../examples/kafka/Kafka010Example.java         |   2 +-
 .../scala/examples/kafka/Kafka010Example.scala  |   2 +-
 flink-streaming-java/pom.xml                    |   6 -
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/functions/sink/SocketClientSink.java    |   2 +-
 .../AbstractDeserializationSchema.java          |  43 ++----
 .../serialization/DeserializationSchema.java    |  10 +-
 .../util/serialization/SerializationSchema.java |   7 +-
 .../util/serialization/SimpleStringSchema.java  |  73 ++---------
 .../TypeInformationSerializationSchema.java     |  96 ++------------
 .../functions/sink/SocketClientSinkTest.java    |   2 +-
 .../util/AbstractDeserializationSchemaTest.java | 120 -----------------
 .../TypeInformationSerializationSchemaTest.java | 122 -----------------
 .../serialization/SimpleStringSchemaTest.java   |  53 --------
 .../flink/streaming/api/scala/DataStream.scala  |   2 +-
 .../api/scala/OutputFormatTestPrograms.scala    |   2 +-
 93 files changed, 845 insertions(+), 568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/docs/quickstart/run_example_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md
index cf73799..d5c48c9 100644
--- a/docs/quickstart/run_example_quickstart.md
+++ b/docs/quickstart/run_example_quickstart.md
@@ -326,7 +326,7 @@ result
 The related classes also need to be imported:
 {% highlight java %}
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.functions.MapFunction;
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 3a6a13b..f569477 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 8575268..184a2e7 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index 01e6329..fbc58ea 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index e263cf2..bbdb32f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 5475c9f..bc675eb 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index e977d49..f5f8af1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 3aa0d1a..c2b3dfa 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -37,7 +38,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 9a0ab04..0f6c531 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index f895e2f..45ceadc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
index 8d165c3..6f75828 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index c27c620..873ef08 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
@@ -42,7 +43,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegat
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -59,6 +59,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index 81d3496..af3b5af 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 1aff670..71158f6 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 576a421..dbf980b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 295451f..922344d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index bde0761..e348aa6 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index 6d259fa..99d5b56 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -37,7 +38,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartiti
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index 6870ecc..fc2957e 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index f41a4e3..0a70f61 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
index 4102bf8..c65ccc1 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index 7ba5103..e5374fb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 434286e..1911c8d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index a14768b..2fce9f9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 998f4d4..8f45881 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 79406d8..a887048 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index aab9ea8..b3b37c6 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index b2b949b..8270b78 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
index 348f0f2..22daafe 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index ac92c8a..890fc3a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index 01a0123..7e3349c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index 9e4d3b7..8627ccb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -19,11 +19,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index fc8678f..304351c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.kafka.clients.producer.Callback;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 0cf40e6..65be712 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 6b9768e..946f7e9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index a90a8d8..808be01 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b2227cd..f65a02d 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 2f057d7..a699d65 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 4d1166c..1d2c028 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
index 27445e1..0ab8a4b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index c8fb4cd..c52b4ca 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index ed3fafb..aeee175 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 6b6c43f..37f3fe8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.kafka.clients.producer.Callback;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index 33ec17e..e4e276a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 0cc9801..8cea36c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.AvroTypeInfo;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 51fd952..f354dad 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index a91cc25..9a6525c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.DefinedFieldMapping;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index a94936c..cac71dc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 0bd04e4..3291f7d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index c7f1d82..0d36f4c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 09acc6a..6f03b12 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index 4523da7..f60a0b7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index d777c8d..100f960 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 13a3677..5ece193 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
index e128aba..93b4f68 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
index 0a181d1..70ae897 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
 /**
  * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
  * interface.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 6b4b6ff..d462953 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
@@ -31,7 +32,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index e9a0331..0a5608a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -60,14 +63,11 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe931d07/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index f81fcf1..7ba3c95 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -41,8 +43,6 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
 import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;