You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/12 13:31:09 UTC

[17/19] flink git commit: [FLINK-8276] [kafka] Properly annotate APIs for Kafka connector

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index 78ab612..983d99c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * The state that the Flink Kafka Consumer holds for each Kafka partition.
  * Includes the Kafka descriptor for partitions.
@@ -27,6 +29,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
  *
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
  */
+@Internal
 public class KafkaTopicPartitionState<KPH> {
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index 3857991..68f842a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * Magic values used to represent special offset states before partitions are actually read.
  *
  * <p>The values are all negative. Negative offsets are not used by Kafka (invalid), so we
  * pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else.
  */
+@Internal
 public class KafkaTopicPartitionStateSentinel {
 
 	/** Magic number that defines an unset offset. */

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
index 5116e9f..015ac71 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
@@ -27,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
  * @param <T> The type of records handled by the watermark generator
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
  */
+@Internal
 public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
 
 	/** The timestamp assigner and watermark generator for the partition. */

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
index f4a80a4..aedddf3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
  * @param <T> The type of records handled by the watermark generator
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
  */
+@Internal
 public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
 
 	/** The timestamp assigner and watermark generator for the partition. */

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
index 9a81ea8..ddea63b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.annotation.Internal;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -30,6 +32,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * A Kafka Topics Descriptor describes how the consumer subscribes to Kafka topics -
  * either a fixed list of topics, or a topic pattern.
  */
+@Internal
 public class KafkaTopicsDescriptor implements Serializable {
 
 	private static final long serialVersionUID = -3807227764764900975L;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
index cedb696..4a5fb9d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals.metrics;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Gauge;
 
 /**
  * Gauge for getting the current value of a Kafka metric.
  */
+@Internal
 public class KafkaMetricWrapper implements Gauge<Double> {
 	private final org.apache.kafka.common.Metric kafkaMetric;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index 6ed3717..906238d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions;
  * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
  * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
  */
+@PublicEvolving
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
 	private int parallelInstanceId;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 168e76b..5a42dc6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * Delegate for the deprecated {@link KafkaPartitioner}.
  * This should only be used for bridging deprecated partitioning API methods.
@@ -25,6 +27,7 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
  * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
  */
 @Deprecated
+@Internal
 public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
 	private final KafkaPartitioner<T> kafkaPartitioner;
 	private int[] partitions;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index b634af7..d1df6d9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 
 /**
  * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
  * across partitions of multiple Kafka topics.
  */
+@PublicEvolving
 public abstract class FlinkKafkaPartitioner<T> implements Serializable {
 
 	private static final long serialVersionUID = -9086719227828020494L;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index eebc619..a2cd4a6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.Serializable;
 
 /**
@@ -27,6 +29,7 @@ import java.io.Serializable;
  *             multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead.
  */
 @Deprecated
+@Internal
 public abstract class KafkaPartitioner<T> implements Serializable {
 
 	private static final long serialVersionUID = -1974260817778593473L;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index f60a0b7..8c572c2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import java.io.IOException;
  *
  * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
+@PublicEvolving
 public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
 	private ObjectMapper mapper;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 30f0fd5..0168eb7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -37,6 +38,7 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  * <p>Metadata fields can be accessed by calling objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
  * the "offset" (long), "topic" (String) and "partition" (int).
  */
+@PublicEvolving
 public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
 	private final boolean includeMetadata;
 	private ObjectMapper mapper;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index 100f960..1f4a60e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -37,6 +38,7 @@ import java.io.IOException;
  *
  * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
  */
+@PublicEvolving
 public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
 
 	/** Type information describing the result type. */

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 36d3137..3e72506 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -37,6 +38,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
  * <p>Result <code>byte[]</code> messages can be deserialized using
  * {@link JsonRowDeserializationSchema}.
  */
+@PublicEvolving
 public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 	/** Fields names in the input Row object. */
 	private final String[] fieldNames;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index 234a96d..0ef6fd5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@ import java.io.Serializable;
  *
  * @param <T> The type created by the keyed deserialization schema.
  */
+@PublicEvolving
 public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
index 93b4f68..06289e5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
@@ -27,6 +28,7 @@ import java.io.IOException;
  * interface.
  * @param <T> The type created by the deserialization schema.
  */
+@Internal
 public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
 
 	private static final long serialVersionUID = 2651665280744549932L;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
index 12bcab9..2f610c2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 
 /**
@@ -26,6 +28,7 @@ import java.io.Serializable;
  *
  * @param <T> The type to be serialized.
  */
+@PublicEvolving
 public interface KeyedSerializationSchema<T> extends Serializable {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
index 70ae897..013ea62 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 
 /**
@@ -24,6 +25,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
  * interface.
  * @param <T> The type to serialize
  */
+@Internal
 public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
 
 	private static final long serialVersionUID = 1351665280744549933L;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ceabed9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 96b8879..3be5779 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -36,6 +37,7 @@ import java.io.IOException;
  * @param <K> The key type to be serialized.
  * @param <V> The value type to be serialized.
  */
+@Internal
 public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
 
 	private static final long serialVersionUID = -5359448468131559102L;