You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:33 UTC

[16/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
new file mode 100644
index 0000000..f7c8981
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import org.apache.flink.kafka_backport.common.protocol.types.ArrayOf;
+import org.apache.flink.kafka_backport.common.protocol.types.Field;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Type;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class Protocol {
+
+    public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", Type.INT16, "The id of the request type."),
+                                                           new Field("api_version", Type.INT16, "The version of the API."),
+                                                           new Field("correlation_id",
+                                                                     Type.INT32,
+                                                                     "A user-supplied integer value that will be passed back with the response"),
+                                                           new Field("client_id",
+                                                                     Type.STRING,
+                                                                     "A user specified identifier for the client making the request."));
+
+    public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+                                                                      Type.INT32,
+                                                                      "The user-supplied value passed in with the request"));
+
+    /* Metadata api */
+
+    public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+                                                                          new ArrayOf(Type.STRING),
+                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+
+    public static final Schema BROKER = new Schema(new Field("node_id", Type.INT32, "The broker id."),
+                                                   new Field("host", Type.STRING, "The hostname of the broker."),
+                                                   new Field("port",
+                                                             Type.INT32,
+                                                             "The port on which the broker accepts requests."));
+
+    public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+                                                                            Type.INT16,
+                                                                            "The error code for the partition, if any."),
+                                                                  new Field("partition_id",
+                                                                            Type.INT32,
+                                                                            "The id of the partition."),
+                                                                  new Field("leader",
+                                                                            Type.INT32,
+                                                                            "The id of the broker acting as leader for this partition."),
+                                                                  new Field("replicas",
+                                                                            new ArrayOf(Type.INT32),
+                                                                            "The set of all nodes that host this partition."),
+                                                                  new Field("isr",
+                                                                            new ArrayOf(Type.INT32),
+                                                                            "The set of nodes that are in sync with the leader for this partition."));
+
+    public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
+                                                                        Type.INT16,
+                                                                        "The error code for the given topic."),
+                                                              new Field("topic", Type.STRING, "The name of the topic"),
+                                                              new Field("partition_metadata",
+                                                                        new ArrayOf(PARTITION_METADATA_V0),
+                                                                        "Metadata for each partition of the topic."));
+
+    public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+                                                                           new ArrayOf(BROKER),
+                                                                           "Host and port information for all brokers."),
+                                                                 new Field("topic_metadata",
+                                                                           new ArrayOf(TOPIC_METADATA_V0)));
+
+    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
+
+    /* Produce api */
+
+    public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                  new Field("data", new ArrayOf(new Schema(new Field("partition", Type.INT32),
+                                                                                                     new Field("record_set", Type.BYTES)))));
+
+    public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+                                                                   Type.INT16,
+                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+                                                               new Field("timeout", Type.INT32, "The time to await a response in ms."),
+                                                               new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+    public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                    new ArrayOf(new Schema(new Field("topic", Type.STRING),
+                                                                                           new Field("partition_responses",
+                                                                                                     new ArrayOf(new Schema(new Field("partition",
+                                                                                                                                      Type.INT32),
+                                                                                                                            new Field("error_code",
+                                                                                                                                      Type.INT16),
+                                                                                                                            new Field("base_offset",
+                                                                                                                                      Type.INT64))))))));
+
+    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
+    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
+
+    /* Offset commit api */
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                         Type.INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         Type.INT64,
+                                                                                         "Message offset to be committed."),
+                                                                               new Field("metadata",
+                                                                                         Type.STRING,
+                                                                                         "Any associated metadata the client wants to keep."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+                                                                                         Type.INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         Type.INT64,
+                                                                                         "Message offset to be committed."),
+                                                                               new Field("timestamp",
+                                                                                         Type.INT64,
+                                                                                         "Timestamp of the commit"),
+                                                                               new Field("metadata",
+                                                                                         Type.STRING,
+                                                                                         "Any associated metadata the client wants to keep."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
+                                                                                         Type.INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         Type.INT64,
+                                                                                         "Message offset to be committed."),
+                                                                               new Field("metadata",
+                                                                                         Type.STRING,
+                                                                                         "Any associated metadata the client wants to keep."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                     Type.STRING,
+                                                                                     "Topic to commit."),
+                                                                           new Field("partitions",
+                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+                                                                                     "Partitions to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
+                                                                                     Type.STRING,
+                                                                                     "Topic to commit."),
+                                                                           new Field("partitions",
+                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
+                                                                                     "Partitions to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
+                                                                                     Type.STRING,
+                                                                                     "Topic to commit."),
+                                                                           new Field("partitions",
+                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
+                                                                                     "Partitions to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                               Type.STRING,
+                                                                               "The consumer group id."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                               "Topics to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
+                                                                               Type.STRING,
+                                                                               "The consumer group id."),
+                                                                     new Field("group_generation_id",
+                                                                               Type.INT32,
+                                                                               "The generation of the consumer group."),
+                                                                     new Field("consumer_id",
+                                                                               Type.STRING,
+                                                                               "The consumer id assigned by the group coordinator."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
+                                                                               "Topics to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
+                                                                               Type.STRING,
+                                                                               "The consumer group id."),
+                                                                     new Field("group_generation_id",
+                                                                               Type.INT32,
+                                                                               "The generation of the consumer group."),
+                                                                     new Field("consumer_id",
+                                                                               Type.STRING,
+                                                                               "The consumer id assigned by the group coordinator."),
+                                                                     new Field("retention_time",
+                                                                               Type.INT64,
+                                                                               "Time period in ms to retain the offset."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
+                                                                               "Topics to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                          Type.INT32,
+                                                                                          "Topic partition id."),
+                                                                                new Field("error_code",
+                                                                                          Type.INT16));
+
+    public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                            new Field("partition_responses",
+                                                                                      new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
+
+    public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+
+    /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
+    public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+    public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
+
+    /* Offset fetch api */
+
+    /*
+     * Wire formats of version 0 and 1 are the same, but with different functionality.
+     * Version 0 will read the offsets from ZK;
+     * Version 1 will read the offsets from Kafka.
+     */
+    public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                        Type.INT32,
+                                                                                        "Topic partition id."));
+
+    public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                    Type.STRING,
+                                                                                    "Topic to fetch offset."),
+                                                                          new Field("partitions",
+                                                                                    new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+                                                                                    "Partitions to fetch offsets."));
+
+    public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                              Type.STRING,
+                                                                              "The consumer group id."),
+                                                                    new Field("topics",
+                                                                              new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+                                                                              "Topics to fetch offsets."));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                         Type.INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         Type.INT64,
+                                                                                         "Last committed message offset."),
+                                                                               new Field("metadata",
+                                                                                         Type.STRING,
+                                                                                         "Any associated metadata the client wants to keep."),
+                                                                               new Field("error_code", Type.INT16));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                           new Field("partition_responses",
+                                                                                     new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+    public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+    public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1};
+
+    /* List offset api */
+    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                       Type.INT32,
+                                                                                       "Topic partition id."),
+                                                                             new Field("timestamp", Type.INT64, "Timestamp."),
+                                                                             new Field("max_num_offsets",
+                                                                                       Type.INT32,
+                                                                                       "Maximum offsets to return."));
+
+    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                   Type.STRING,
+                                                                                   "Topic to list offset."),
+                                                                         new Field("partitions",
+                                                                                   new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+                                                                                   "Partitions to list offset."));
+
+    public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+                                                                             Type.INT32,
+                                                                             "Broker id of the follower. For normal consumers, use -1."),
+                                                                   new Field("topics",
+                                                                             new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+                                                                             "Topics to list offsets."));
+
+    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                        Type.INT32,
+                                                                                        "Topic partition id."),
+                                                                              new Field("error_code", Type.INT16),
+                                                                              new Field("offsets",
+                                                                                        new ArrayOf(Type.INT64),
+                                                                                        "A list of offsets."));
+
+    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                          new Field("partition_responses",
+                                                                                    new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+    public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
+    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
+
+    /* Fetch api */
+    public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                 Type.INT32,
+                                                                                 "Topic partition id."),
+                                                                       new Field("fetch_offset",
+                                                                                 Type.INT64,
+                                                                                 "Message offset."),
+                                                                       new Field("max_bytes",
+                                                                                 Type.INT32,
+                                                                                 "Maximum bytes to fetch."));
+
+    public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", Type.STRING, "Topic to fetch."),
+                                                                   new Field("partitions",
+                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+                                                                             "Partitions to fetch."));
+
+    public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
+                                                                       Type.INT32,
+                                                                       "Broker id of the follower. For normal consumers, use -1."),
+                                                             new Field("max_wait_time",
+                                                                       Type.INT32,
+                                                                       "Maximum time in ms to wait for the response."),
+                                                             new Field("min_bytes",
+                                                                       Type.INT32,
+                                                                       "Minimum bytes to accumulate in the response."),
+                                                             new Field("topics",
+                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                                                                       "Topics to fetch."));
+
+    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                  Type.INT32,
+                                                                                  "Topic partition id."),
+                                                                        new Field("error_code", Type.INT16),
+                                                                        new Field("high_watermark",
+                                                                                  Type.INT64,
+                                                                                  "Last committed offset."),
+                                                                        new Field("record_set", Type.BYTES));
+
+    public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                    new Field("partition_responses",
+                                                                              new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+
+    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                        new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
+
+    /* Consumer metadata api */
+    public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                                   Type.STRING,
+                                                                                   "The consumer group id."));
+
+    public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
+                                                                          new Field("coordinator",
+                                                                                    BROKER,
+                                                                                    "Host and port information for the coordinator for a consumer group."));
+
+    public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
+    public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
+
+    /* Join group api */
+    public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                            Type.STRING,
+                                                                            "The consumer group id."),
+                                                                  new Field("session_timeout",
+                                                                            Type.INT32,
+                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+                                                                  new Field("topics",
+                                                                            new ArrayOf(Type.STRING),
+                                                                            "An array of topics to subscribe to."),
+                                                                  new Field("consumer_id",
+                                                                            Type.STRING,
+                                                                            "The assigned consumer id or an empty string for a new consumer."),
+                                                                  new Field("partition_assignment_strategy",
+                                                                            Type.STRING,
+                                                                            "The strategy for the coordinator to assign partitions."));
+
+    public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+                                                                         new Field("partitions", new ArrayOf(Type.INT32)));
+    public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
+                                                                   new Field("group_generation_id",
+                                                                             Type.INT32,
+                                                                             "The generation of the consumer group."),
+                                                                   new Field("consumer_id",
+                                                                             Type.STRING,
+                                                                             "The consumer id assigned by the group coordinator."),
+                                                                   new Field("assigned_partitions",
+                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
+    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
+
+    /* Heartbeat api */
+    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", Type.STRING, "The consumer group id."),
+                                                                 new Field("group_generation_id",
+                                                                           Type.INT32,
+                                                                           "The generation of the consumer group."),
+                                                                 new Field("consumer_id",
+                                                                           Type.STRING,
+                                                                           "The consumer id assigned by the group coordinator."));
+
+    public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16));
+
+    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+
+    /* an array of all requests and responses with all schema versions */
+    public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+    public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+
+    /* the latest version of each api */
+    public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+
+    static {
+        REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
+        REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
+        REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
+        REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
+        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+        REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+        REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
+        REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
+        REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+        REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
+        REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+
+        RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
+        RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
+        RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
+        RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
+        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+        RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+        RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
+        RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
+        RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
+        RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+
+        /* set the maximum version of each api */
+        for (ApiKeys api : ApiKeys.values())
+            CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+
+        /* sanity check that we have the same number of request and response versions for each api */
+        for (ApiKeys api : ApiKeys.values())
+            if (REQUESTS[api.id].length != RESPONSES[api.id].length)
+                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+                        + " but " + RESPONSES[api.id].length + " response versions.");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
new file mode 100644
index 0000000..ab5a607
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public enum SecurityProtocol {
+    /** Un-authenticated, non-encrypted channel */
+    PLAINTEXT(0, "PLAINTEXT"),
+    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
+    TRACE(Short.MAX_VALUE, "TRACE");
+
+    private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
+    private static final List<String> NAMES = new ArrayList<String>();
+
+    static {
+        for (SecurityProtocol proto: SecurityProtocol.values()) {
+            CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
+            NAMES.add(proto.name);
+        }
+    }
+
+    /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol  */
+    public final short id;
+
+    /** Name of the security protocol. This may be used by client configuration. */
+    public final String name;
+
+    private SecurityProtocol(int id, String name) {
+        this.id = (short) id;
+        this.name = name;
+    }
+
+    public static String getName(int id) {
+        return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
+    }
+
+    public static List<String> getNames() {
+        return NAMES;
+    }
+
+    public static SecurityProtocol forId(Short id) {
+        return CODE_TO_SECURITY_PROTOCOL.get(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
new file mode 100644
index 0000000..d2468d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Represents a type for an array of a particular type
+ */
+public class ArrayOf extends Type {
+
+    private final Type type;
+
+    public ArrayOf(Type type) {
+        this.type = type;
+    }
+
+    @Override
+    public void write(ByteBuffer buffer, Object o) {
+        Object[] objs = (Object[]) o;
+        int size = objs.length;
+        buffer.putInt(size);
+        for (int i = 0; i < size; i++)
+            type.write(buffer, objs[i]);
+    }
+
+    @Override
+    public Object read(ByteBuffer buffer) {
+        int size = buffer.getInt();
+        Object[] objs = new Object[size];
+        for (int i = 0; i < size; i++)
+            objs[i] = type.read(buffer);
+        return objs;
+    }
+
+    @Override
+    public int sizeOf(Object o) {
+        Object[] objs = (Object[]) o;
+        int size = 4;
+        for (int i = 0; i < objs.length; i++)
+            size += type.sizeOf(objs[i]);
+        return size;
+    }
+
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public String toString() {
+        return "ARRAY(" + type + ")";
+    }
+
+    @Override
+    public Object[] validate(Object item) {
+        try {
+            Object[] array = (Object[]) item;
+            for (int i = 0; i < array.length; i++)
+                type.validate(array[i]);
+            return array;
+        } catch (ClassCastException e) {
+            throw new SchemaException("Not an Object[].");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
new file mode 100644
index 0000000..b7d7720
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A field in a schema
+ */
+public class Field {
+
+    public static final Object NO_DEFAULT = new Object();
+
+    final int index;
+    public final String name;
+    public final Type type;
+    public final Object defaultValue;
+    public final String doc;
+    final Schema schema;
+
+    /**
+     * Create the field.
+     *
+     * @throws SchemaException If the default value is not primitive and the validation fails
+     */
+    public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
+        this.index = index;
+        this.name = name;
+        this.type = type;
+        this.doc = doc;
+        this.defaultValue = defaultValue;
+        this.schema = schema;
+        if (defaultValue != NO_DEFAULT)
+            type.validate(defaultValue);
+    }
+
+    public Field(int index, String name, Type type, String doc, Object defaultValue) {
+        this(index, name, type, doc, defaultValue, null);
+    }
+
+    public Field(String name, Type type, String doc, Object defaultValue) {
+        this(-1, name, type, doc, defaultValue);
+    }
+
+    public Field(String name, Type type, String doc) {
+        this(name, type, doc, NO_DEFAULT);
+    }
+
+    public Field(String name, Type type) {
+        this(name, type, "");
+    }
+
+    public Type type() {
+        return type;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
new file mode 100644
index 0000000..7adac52
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The schema for a compound record definition
+ */
+public class Schema extends Type {
+
+    private final Field[] fields;
+    private final Map<String, Field> fieldsByName;
+
+    /**
+     * Construct the schema with a given list of its field values
+     *
+     * @throws SchemaException If the given list have duplicate fields
+     */
+    public Schema(Field... fs) {
+        this.fields = new Field[fs.length];
+        this.fieldsByName = new HashMap<String, Field>();
+        for (int i = 0; i < this.fields.length; i++) {
+            Field field = fs[i];
+            if (fieldsByName.containsKey(field.name))
+                throw new SchemaException("Schema contains a duplicate field: " + field.name);
+            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
+            this.fieldsByName.put(fs[i].name, this.fields[i]);
+        }
+    }
+
+    /**
+     * Write a struct to the buffer
+     */
+    public void write(ByteBuffer buffer, Object o) {
+        Struct r = (Struct) o;
+        for (int i = 0; i < fields.length; i++) {
+            Field f = fields[i];
+            try {
+                Object value = f.type().validate(r.get(f));
+                f.type.write(buffer, value);
+            } catch (Exception e) {
+                throw new SchemaException("Error writing field '" + f.name +
+                                          "': " +
+                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+            }
+        }
+    }
+
+    /**
+     * Read a struct from the buffer
+     */
+    public Object read(ByteBuffer buffer) {
+        Object[] objects = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            try {
+                objects[i] = fields[i].type.read(buffer);
+            } catch (Exception e) {
+                throw new SchemaException("Error reading field '" + fields[i].name +
+                                          "': " +
+                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+            }
+        }
+        return new Struct(this, objects);
+    }
+
+    /**
+     * The size of the given record
+     */
+    public int sizeOf(Object o) {
+        int size = 0;
+        Struct r = (Struct) o;
+        for (int i = 0; i < fields.length; i++)
+            size += fields[i].type.sizeOf(r.get(fields[i]));
+        return size;
+    }
+
+    /**
+     * The number of fields in this schema
+     */
+    public int numFields() {
+        return this.fields.length;
+    }
+
+    /**
+     * Get a field by its slot in the record array
+     * 
+     * @param slot The slot at which this field sits
+     * @return The field
+     */
+    public Field get(int slot) {
+        return this.fields[slot];
+    }
+
+    /**
+     * Get a field by its name
+     * 
+     * @param name The name of the field
+     * @return The field
+     */
+    public Field get(String name) {
+        return this.fieldsByName.get(name);
+    }
+
+    /**
+     * Get all the fields in this schema
+     */
+    public Field[] fields() {
+        return this.fields;
+    }
+
+    /**
+     * Display a string representation of the schema
+     */
+    public String toString() {
+        StringBuilder b = new StringBuilder();
+        b.append('{');
+        for (int i = 0; i < this.fields.length; i++) {
+            b.append(this.fields[i].name);
+            b.append(':');
+            b.append(this.fields[i].type());
+            if (i < this.fields.length - 1)
+                b.append(',');
+        }
+        b.append("}");
+        return b.toString();
+    }
+
+    @Override
+    public Struct validate(Object item) {
+        try {
+            Struct struct = (Struct) item;
+            for (int i = 0; i < this.fields.length; i++) {
+                Field field = this.fields[i];
+                try {
+                    field.type.validate(struct.get(field));
+                } catch (SchemaException e) {
+                    throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+                }
+            }
+            return struct;
+        } catch (ClassCastException e) {
+            throw new SchemaException("Not a Struct.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
new file mode 100644
index 0000000..86c141e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ *  Thrown if the protocol schema validation fails while parsing request or response.
+ */
+public class SchemaException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SchemaException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
new file mode 100644
index 0000000..482fe9d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A record that can be serialized and deserialized according to a pre-defined schema
+ */
+public class Struct {
+    private final Schema schema;
+    private final Object[] values;
+
+    Struct(Schema schema, Object[] values) {
+        this.schema = schema;
+        this.values = values;
+    }
+
+    public Struct(Schema schema) {
+        this.schema = schema;
+        this.values = new Object[this.schema.numFields()];
+    }
+
+    /**
+     * The schema for this struct.
+     */
+    public Schema schema() {
+        return this.schema;
+    }
+
+    /**
+     * Return the value of the given pre-validated field, or if the value is missing return the default value.
+     * 
+     * @param field The field for which to get the default value
+     * @throws SchemaException if the field has no value and has no default.
+     */
+    private Object getFieldOrDefault(Field field) {
+        Object value = this.values[field.index];
+        if (value != null)
+            return value;
+        else if (field.defaultValue != Field.NO_DEFAULT)
+            return field.defaultValue;
+        else
+            throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
+    }
+
+    /**
+     * Get the value for the field directly by the field index with no lookup needed (faster!)
+     * 
+     * @param field The field to look up
+     * @return The value for that field.
+     * @throws SchemaException if the field has no value and has no default.
+     */
+    public Object get(Field field) {
+        validateField(field);
+        return getFieldOrDefault(field);
+    }
+
+    /**
+     * Get the record value for the field with the given name by doing a hash table lookup (slower!)
+     * 
+     * @param name The name of the field
+     * @return The value in the record
+     * @throws SchemaException If no such field exists
+     */
+    public Object get(String name) {
+        Field field = schema.get(name);
+        if (field == null)
+            throw new SchemaException("No such field: " + name);
+        return getFieldOrDefault(field);
+    }
+
+    /**
+     * Check if the struct contains a field.
+     * @param name
+     * @return Whether a field exists.
+     */
+    public boolean hasField(String name) {
+        return schema.get(name) != null;
+    }
+
+    public Struct getStruct(Field field) {
+        return (Struct) get(field);
+    }
+
+    public Struct getStruct(String name) {
+        return (Struct) get(name);
+    }
+
+    public Short getShort(Field field) {
+        return (Short) get(field);
+    }
+
+    public Short getShort(String name) {
+        return (Short) get(name);
+    }
+
+    public Integer getInt(Field field) {
+        return (Integer) get(field);
+    }
+
+    public Integer getInt(String name) {
+        return (Integer) get(name);
+    }
+
+    public Long getLong(Field field) {
+        return (Long) get(field);
+    }
+
+    public Long getLong(String name) {
+        return (Long) get(name);
+    }
+
+    public Object[] getArray(Field field) {
+        return (Object[]) get(field);
+    }
+
+    public Object[] getArray(String name) {
+        return (Object[]) get(name);
+    }
+
+    public String getString(Field field) {
+        return (String) get(field);
+    }
+
+    public String getString(String name) {
+        return (String) get(name);
+    }
+
+    public ByteBuffer getBytes(Field field) {
+        return (ByteBuffer) get(field);
+    }
+
+    public ByteBuffer getBytes(String name) {
+        return (ByteBuffer) get(name);
+    }
+
+    /**
+     * Set the given field to the specified value
+     * 
+     * @param field The field
+     * @param value The value
+     * @throws SchemaException If the validation of the field failed
+     */
+    public Struct set(Field field, Object value) {
+        validateField(field);
+        this.values[field.index] = value;
+        return this;
+    }
+
+    /**
+     * Set the field specified by the given name to the value
+     * 
+     * @param name The name of the field
+     * @param value The value to set
+     * @throws SchemaException If the field is not known
+     */
+    public Struct set(String name, Object value) {
+        Field field = this.schema.get(name);
+        if (field == null)
+            throw new SchemaException("Unknown field: " + name);
+        this.values[field.index] = value;
+        return this;
+    }
+
+    /**
+     * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
+     * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
+     * instantiated with this method.
+     * 
+     * @param field The field to create an instance of
+     * @return The struct
+     * @throws SchemaException If the given field is not a container type
+     */
+    public Struct instance(Field field) {
+        validateField(field);
+        if (field.type() instanceof Schema) {
+            return new Struct((Schema) field.type());
+        } else if (field.type() instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) field.type();
+            return new Struct((Schema) array.type());
+        } else {
+            throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
+        }
+    }
+
+    /**
+     * Create a struct instance for the given field which must be a container type (struct or array)
+     * 
+     * @param field The name of the field to create (field must be a schema type)
+     * @return The struct
+     * @throws SchemaException If the given field is not a container type
+     */
+    public Struct instance(String field) {
+        return instance(schema.get(field));
+    }
+
+    /**
+     * Empty all the values from this record
+     */
+    public void clear() {
+        Arrays.fill(this.values, null);
+    }
+
+    /**
+     * Get the serialized size of this object
+     */
+    public int sizeOf() {
+        return this.schema.sizeOf(this);
+    }
+
+    /**
+     * Write this struct to a buffer
+     */
+    public void writeTo(ByteBuffer buffer) {
+        this.schema.write(buffer, this);
+    }
+
+    /**
+     * Ensure the user doesn't try to access fields from the wrong schema
+     *
+     * @throws SchemaException If validation fails
+     */
+    private void validateField(Field field) {
+        if (this.schema != field.schema)
+            throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
+        if (field.index > values.length)
+            throw new SchemaException("Invalid field index: " + field.index);
+    }
+
+    /**
+     * Validate the contents of this struct against its schema
+     *
+     * @throws SchemaException If validation fails
+     */
+    public void validate() {
+        this.schema.validate(this);
+    }
+
+    /**
+     * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
+     * the struct into multiple ByteBuffers if need be.
+     */
+    public ByteBuffer[] toBytes() {
+        ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
+        writeTo(buffer);
+        return new ByteBuffer[] {buffer};
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder b = new StringBuilder();
+        b.append('{');
+        for (int i = 0; i < this.values.length; i++) {
+            Field f = this.schema.get(i);
+            b.append(f.name);
+            b.append('=');
+            if (f.type() instanceof ArrayOf) {
+                Object[] arrayValue = (Object[]) this.values[i];
+                b.append('[');
+                for (int j = 0; j < arrayValue.length; j++) {
+                    b.append(arrayValue[j]);
+                    if (j < arrayValue.length - 1)
+                        b.append(',');
+                }
+                b.append(']');
+            } else
+                b.append(this.values[i]);
+            if (i < this.values.length - 1)
+                b.append(',');
+        }
+        b.append('}');
+        return b.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        for (int i = 0; i < this.values.length; i++) {
+            Field f = this.schema.get(i);
+            if (f.type() instanceof ArrayOf) {
+                Object[] arrayObject = (Object[]) this.get(f);
+                for (Object arrayItem: arrayObject)
+                    result = prime * result + arrayItem.hashCode();
+            } else {
+                result = prime * result + this.get(f).hashCode();
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Struct other = (Struct) obj;
+        if (schema != other.schema)
+            return false;
+        for (int i = 0; i < this.values.length; i++) {
+            Field f = this.schema.get(i);
+            Boolean result;
+            if (f.type() instanceof ArrayOf) {
+                result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
+            } else {
+                result = this.get(f).equals(other.get(f));
+            }
+            if (!result)
+                return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
new file mode 100644
index 0000000..26bdd2f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A serializable type
+ */
+public abstract class Type {
+
+    /**
+     * Write the typed object to the buffer
+     *
+     * @throws SchemaException If the object is not valid for its type
+     */
+    public abstract void write(ByteBuffer buffer, Object o);
+
+    /**
+     * Read the typed object from the buffer
+     *
+     * @throws SchemaException If the object is not valid for its type
+     */
+    public abstract Object read(ByteBuffer buffer);
+
+    /**
+     * Validate the object. If succeeded return its typed object.
+     *
+     * @throws SchemaException If validation failed
+     */
+    public abstract Object validate(Object o);
+
+    /**
+     * Return the size of the object in bytes
+     */
+    public abstract int sizeOf(Object o);
+
+    public static final Type INT8 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.put((Byte) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.get();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 1;
+        }
+
+        @Override
+        public String toString() {
+            return "INT8";
+        }
+
+        @Override
+        public Byte validate(Object item) {
+            if (item instanceof Byte)
+                return (Byte) item;
+            else
+                throw new SchemaException(item + " is not a Byte.");
+        }
+    };
+
+    public static final Type INT16 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putShort((Short) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getShort();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 2;
+        }
+
+        @Override
+        public String toString() {
+            return "INT16";
+        }
+
+        @Override
+        public Short validate(Object item) {
+            if (item instanceof Short)
+                return (Short) item;
+            else
+                throw new SchemaException(item + " is not a Short.");
+        }
+    };
+
+    public static final Type INT32 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putInt((Integer) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getInt();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 4;
+        }
+
+        @Override
+        public String toString() {
+            return "INT32";
+        }
+
+        @Override
+        public Integer validate(Object item) {
+            if (item instanceof Integer)
+                return (Integer) item;
+            else
+                throw new SchemaException(item + " is not an Integer.");
+        }
+    };
+
+    public static final Type INT64 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putLong((Long) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getLong();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 8;
+        }
+
+        @Override
+        public String toString() {
+            return "INT64";
+        }
+
+        @Override
+        public Long validate(Object item) {
+            if (item instanceof Long)
+                return (Long) item;
+            else
+                throw new SchemaException(item + " is not a Long.");
+        }
+    };
+
+    public static final Type STRING = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            byte[] bytes = Utils.utf8((String) o);
+            if (bytes.length > Short.MAX_VALUE)
+                throw new SchemaException("String is longer than the maximum string length.");
+            buffer.putShort((short) bytes.length);
+            buffer.put(bytes);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int length = buffer.getShort();
+            byte[] bytes = new byte[length];
+            buffer.get(bytes);
+            return Utils.utf8(bytes);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 2 + Utils.utf8Length((String) o);
+        }
+
+        @Override
+        public String toString() {
+            return "STRING";
+        }
+
+        @Override
+        public String validate(Object item) {
+            if (item instanceof String)
+                return (String) item;
+            else
+                throw new SchemaException(item + " is not a String.");
+        }
+    };
+
+    public static final Type BYTES = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteBuffer arg = (ByteBuffer) o;
+            int pos = arg.position();
+            buffer.putInt(arg.remaining());
+            buffer.put(arg);
+            arg.position(pos);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int size = buffer.getInt();
+            ByteBuffer val = buffer.slice();
+            val.limit(size);
+            buffer.position(buffer.position() + size);
+            return val;
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            ByteBuffer buffer = (ByteBuffer) o;
+            return 4 + buffer.remaining();
+        }
+
+        @Override
+        public String toString() {
+            return "BYTES";
+        }
+
+        @Override
+        public ByteBuffer validate(Object item) {
+            if (item instanceof ByteBuffer)
+                return (ByteBuffer) item;
+            else
+                throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
new file mode 100644
index 0000000..99a20a3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A byte buffer backed input outputStream
+ */
+public class ByteBufferInputStream extends InputStream {
+
+    private ByteBuffer buffer;
+
+    public ByteBufferInputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public int read() {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+        return buffer.get() & 0xFF;
+    }
+
+    public int read(byte[] bytes, int off, int len) {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buffer.remaining());
+        buffer.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
new file mode 100644
index 0000000..a334755
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+    private static final float REALLOCATION_FACTOR = 1.1f;
+
+    private ByteBuffer buffer;
+
+    public ByteBufferOutputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public void write(int b) {
+        if (buffer.remaining() < 1)
+            expandBuffer(buffer.capacity() + 1);
+        buffer.put((byte) b);
+    }
+
+    public void write(byte[] bytes, int off, int len) {
+        if (buffer.remaining() < len)
+            expandBuffer(buffer.capacity() + len);
+        buffer.put(bytes, off, len);
+    }
+
+    public ByteBuffer buffer() {
+        return buffer;
+    }
+
+    private void expandBuffer(int size) {
+        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        buffer = temp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
new file mode 100644
index 0000000..9961766
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The compression type to use
+ */
+public enum CompressionType {
+    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f);
+
+    public final int id;
+    public final String name;
+    public final float rate;
+
+    private CompressionType(int id, String name, float rate) {
+        this.id = id;
+        this.name = name;
+        this.rate = rate;
+    }
+
+    public static CompressionType forId(int id) {
+        switch (id) {
+            case 0:
+                return NONE;
+            case 1:
+                return GZIP;
+            case 2:
+                return SNAPPY;
+            case 3:
+                return LZ4;
+            default:
+                throw new IllegalArgumentException("Unknown compression type id: " + id);
+        }
+    }
+
+    public static CompressionType forName(String name) {
+        if (NONE.name.equals(name))
+            return NONE;
+        else if (GZIP.name.equals(name))
+            return GZIP;
+        else if (SNAPPY.name.equals(name))
+            return SNAPPY;
+        else if (LZ4.name.equals(name))
+            return LZ4;
+        else
+            throw new IllegalArgumentException("Unknown compression name: " + name);
+    }
+
+}