You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:31 UTC

[14/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
new file mode 100644
index 0000000..f797ebe
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class FetchRequest extends AbstractRequest {
+    
+    public static final int CONSUMER_REPLICA_ID = -1;
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+    private static final String REPLICA_ID_KEY_NAME = "replica_id";
+    private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
+    private static final String MIN_BYTES_KEY_NAME = "min_bytes";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
+    private final int replicaId;
+    private final int maxWait;
+    private final int minBytes;
+    private final Map<TopicPartition, PartitionData> fetchData;
+
+    public static final class PartitionData {
+        public final long offset;
+        public final int maxBytes;
+
+        public PartitionData(long offset, int maxBytes) {
+            this.offset = offset;
+            this.maxBytes = maxBytes;
+        }
+    }
+
+    /**
+     * Create a non-replica fetch request
+     */
+    public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+    }
+
+    /**
+     * Create a replica fetch request
+     */
+    public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+        super(new Struct(CURRENT_SCHEMA));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        struct.set(MAX_WAIT_KEY_NAME, maxWait);
+        struct.set(MIN_BYTES_KEY_NAME, minBytes);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        this.replicaId = replicaId;
+        this.maxWait = maxWait;
+        this.minBytes = minBytes;
+        this.fetchData = fetchData;
+    }
+
+    public FetchRequest(Struct struct) {
+        super(struct);
+        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+        maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
+        minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
+        fetchData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
+                int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
+                PartitionData partitionData = new PartitionData(offset, maxBytes);
+                fetchData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+
+        for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
+            FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
+                    FetchResponse.INVALID_HIGHWATERMARK,
+                    FetchResponse.EMPTY_RECORD_SET);
+            responseData.put(entry.getKey(), partitionResponse);
+        }
+
+        switch (versionId) {
+            case 0:
+                return new FetchResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
+        }
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public int maxWait() {
+        return maxWait;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public Map<TopicPartition, PartitionData> fetchData() {
+        return fetchData;
+    }
+
+    public static FetchRequest parse(ByteBuffer buffer, int versionId) {
+        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
+    }
+
+    public static FetchRequest parse(ByteBuffer buffer) {
+        return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
new file mode 100644
index 0000000..158833e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class FetchResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
+
+    // topic level field names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level field names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     *  OFFSET_OUT_OF_RANGE (1)
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  NOT_LEADER_FOR_PARTITION (6)
+     *  REPLICA_NOT_AVAILABLE (9)
+     *  UNKNOWN (-1)
+     */
+
+    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+    private static final String RECORD_SET_KEY_NAME = "record_set";
+
+    public static final long INVALID_HIGHWATERMARK = -1L;
+    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
+
+    private final Map<TopicPartition, PartitionData> responseData;
+
+    public static final class PartitionData {
+        public final short errorCode;
+        public final long highWatermark;
+        public final ByteBuffer recordSet;
+
+        public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+            this.errorCode = errorCode;
+            this.highWatermark = highWatermark;
+            this.recordSet = recordSet;
+        }
+    }
+
+    public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(CURRENT_SCHEMA));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public FetchResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public Map<TopicPartition, PartitionData> responseData() {
+        return responseData;
+    }
+
+    public static FetchResponse parse(ByteBuffer buffer) {
+        return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
new file mode 100644
index 0000000..c8abb67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class HeartbeatRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+    private final String groupId;
+    private final int groupGenerationId;
+    private final String consumerId;
+
+    public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.groupGenerationId = groupGenerationId;
+        this.consumerId = consumerId;
+    }
+
+    public HeartbeatRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new HeartbeatResponse(Errors.forException(e).code());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int groupGenerationId() {
+        return groupGenerationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
+        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
+    }
+
+    public static HeartbeatRequest parse(ByteBuffer buffer) {
+        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
new file mode 100644
index 0000000..4bf6669
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class HeartbeatResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * ILLEGAL_GENERATION (22)
+     * UNKNOWN_CONSUMER_ID (25)
+     */
+
+    private final short errorCode;
+    public HeartbeatResponse(short errorCode) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        this.errorCode = errorCode;
+    }
+
+    public HeartbeatResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static HeartbeatResponse parse(ByteBuffer buffer) {
+        return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
new file mode 100644
index 0000000..f098d18
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class JoinGroupRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+
+    public static final String UNKNOWN_CONSUMER_ID = "";
+
+    private final String groupId;
+    private final int sessionTimeout;
+    private final List<String> topics;
+    private final String consumerId;
+    private final String strategy;
+
+    public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(STRATEGY_KEY_NAME, strategy);
+        this.groupId = groupId;
+        this.sessionTimeout = sessionTimeout;
+        this.topics = topics;
+        this.consumerId = consumerId;
+        this.strategy = strategy;
+    }
+
+    public JoinGroupRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
+        topics = new ArrayList<String>();
+        for (Object topic: topicsArray)
+            topics.add((String) topic);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        strategy = struct.getString(STRATEGY_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new JoinGroupResponse(
+                        Errors.forException(e).code(),
+                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
+                        Collections.<TopicPartition>emptyList());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int sessionTimeout() {
+        return sessionTimeout;
+    }
+
+    public List<String> topics() {
+        return topics;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public String strategy() {
+        return strategy;
+    }
+
+    public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
+        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
+    }
+
+    public static JoinGroupRequest parse(ByteBuffer buffer) {
+        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
new file mode 100644
index 0000000..7d9b647
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class JoinGroupResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
+     * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
+     * UNKNOWN_CONSUMER_ID (25)
+     * INVALID_SESSION_TIMEOUT (26)
+     */
+
+    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    public static final int UNKNOWN_GENERATION_ID = -1;
+    public static final String UNKNOWN_CONSUMER_ID = "";
+
+    private final short errorCode;
+    private final int generationId;
+    private final String consumerId;
+    private final List<TopicPartition> assignedPartitions;
+
+    public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
+            Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+
+        this.errorCode = errorCode;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.assignedPartitions = assignedPartitions;
+    }
+
+    public JoinGroupResponse(Struct struct) {
+        super(struct);
+        assignedPartitions = new ArrayList<TopicPartition>();
+        for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
+            Struct topicData = (Struct) topicDataObj;
+            String topic = topicData.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
+                assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+        }
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public List<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    public static JoinGroupResponse parse(ByteBuffer buffer) {
+        return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
new file mode 100644
index 0000000..069e06d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ListOffsetRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+    private static final String REPLICA_ID_KEY_NAME = "replica_id";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level field names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";
+    private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+
+    private final int replicaId;
+    private final Map<TopicPartition, PartitionData> offsetData;
+
+    public static final class PartitionData {
+        public final long timestamp;
+        public final int maxNumOffsets;
+
+        public PartitionData(long timestamp, int maxNumOffsets) {
+            this.timestamp = timestamp;
+            this.maxNumOffsets = maxNumOffsets;
+        }
+    }
+    
+    public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
+        this(-1, offsetData);
+    }
+
+    public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(CURRENT_SCHEMA));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData offsetPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
+                partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        this.replicaId = replicaId;
+        this.offsetData = offsetData;
+    }
+
+    public ListOffsetRequest(Struct struct) {
+        super(struct);
+        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+        offsetData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
+                PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
+                offsetData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+
+        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
+            ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
+            responseData.put(entry.getKey(), partitionResponse);
+        }
+
+        switch (versionId) {
+            case 0:
+                return new ListOffsetResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
+        }
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public Map<TopicPartition, PartitionData> offsetData() {
+        return offsetData;
+    }
+
+    public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
+        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
+    }
+
+    public static ListOffsetRequest parse(ByteBuffer buffer) {
+        return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
new file mode 100644
index 0000000..b831f61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ListOffsetResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
+
+    // topic level field names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level field names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  NOT_LEADER_FOR_PARTITION (6)
+     *  UNKNOWN (-1)
+     */
+
+    private static final String OFFSETS_KEY_NAME = "offsets";
+
+    private final Map<TopicPartition, PartitionData> responseData;
+
+    public static final class PartitionData {
+        public final short errorCode;
+        public final List<Long> offsets;
+
+        public PartitionData(short errorCode, List<Long> offsets) {
+            this.errorCode = errorCode;
+            this.offsets = offsets;
+        }
+    }
+
+    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+        super(new Struct(CURRENT_SCHEMA));
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData offsetPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
+                partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public ListOffsetResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
+                List<Long> offsetsList = new ArrayList<Long>();
+                for (Object offset: offsets)
+                    offsetsList.add((Long) offset);
+                PartitionData partitionData = new PartitionData(errorCode, offsetsList);
+                responseData.put(new TopicPartition(topic, partition), partitionData);
+            }
+        }
+    }
+
+    public Map<TopicPartition, PartitionData> responseData() {
+        return responseData;
+    }
+
+    public static ListOffsetResponse parse(ByteBuffer buffer) {
+        return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
new file mode 100644
index 0000000..2820fcd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class MetadataRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    private final List<String> topics;
+
+    public MetadataRequest(List<String> topics) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        this.topics = topics;
+    }
+
+    public MetadataRequest(Struct struct) {
+        super(struct);
+        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+        topics = new ArrayList<String>();
+        for (Object topicObj: topicArray) {
+            topics.add((String) topicObj);
+        }
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<String, Errors> topicErrors = new HashMap<String, Errors>();
+        for (String topic : topics) {
+            topicErrors.put(topic, Errors.forException(e));
+        }
+
+        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+        switch (versionId) {
+            case 0:
+                return new MetadataResponse(cluster, topicErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
+        }
+    }
+
+    public List<String> topics() {
+        return topics;
+    }
+
+    public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
+        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
+    }
+
+    public static MetadataRequest parse(ByteBuffer buffer) {
+        return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
new file mode 100644
index 0000000..83d7290
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class MetadataResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+    private static final String BROKERS_KEY_NAME = "brokers";
+    private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+
+    // broker level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    // topic level field names
+    private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+
+    // partition level field names
+    private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+
+    /**
+     * Possible error code:
+     *
+     * TODO
+     */
+
+    private static final String PARTITION_KEY_NAME = "partition_id";
+    private static final String LEADER_KEY_NAME = "leader";
+    private static final String REPLICAS_KEY_NAME = "replicas";
+    private static final String ISR_KEY_NAME = "isr";
+
+    private final Cluster cluster;
+    private final Map<String, Errors> errors;
+
+    /**
+     * Constructor for MetadataResponse where there are errors for some of the topics,
+     * error data take precedence over cluster information for particular topic
+     */
+    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> brokerArray = new ArrayList<Struct>();
+        for (Node node : cluster.nodes()) {
+            Struct broker = struct.instance(BROKERS_KEY_NAME);
+            broker.set(NODE_ID_KEY_NAME, node.id());
+            broker.set(HOST_KEY_NAME, node.host());
+            broker.set(PORT_KEY_NAME, node.port());
+            brokerArray.add(broker);
+        }
+        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (String topic : cluster.topics()) {
+            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+
+            topicData.set(TOPIC_KEY_NAME, topic);
+            if (errors.containsKey(topic)) {
+                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
+            } else {
+                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+                List<Struct> partitionArray = new ArrayList<Struct>();
+                for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+                    Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+                    partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+                    partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+                    partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+                    ArrayList<Integer> replicas = new ArrayList<Integer>();
+                    for (Node node : fetchPartitionData.replicas())
+                        replicas.add(node.id());
+                    partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+                    ArrayList<Integer> isr = new ArrayList<Integer>();
+                    for (Node node : fetchPartitionData.inSyncReplicas())
+                        isr.add(node.id());
+                    partitionData.set(ISR_KEY_NAME, isr.toArray());
+                    partitionArray.add(partitionData);
+                }
+                topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+            }
+
+            topicArray.add(topicData);
+        }
+        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
+
+        this.cluster = cluster;
+        this.errors = new HashMap<String, Errors>();
+    }
+
+    public MetadataResponse(Struct struct) {
+        super(struct);
+        Map<String, Errors> errors = new HashMap<String, Errors>();
+        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
+        Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
+        for (int i = 0; i < brokerStructs.length; i++) {
+            Struct broker = (Struct) brokerStructs[i];
+            int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+            String host = broker.getString(HOST_KEY_NAME);
+            int port = broker.getInt(PORT_KEY_NAME);
+            brokers.put(nodeId, new Node(nodeId, host, port));
+        }
+        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+        Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
+        for (int i = 0; i < topicInfos.length; i++) {
+            Struct topicInfo = (Struct) topicInfos[i];
+            short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
+            String topic = topicInfo.getString(TOPIC_KEY_NAME);
+            if (topicError == Errors.NONE.code()) {
+                Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
+                for (int j = 0; j < partitionInfos.length; j++) {
+                    Struct partitionInfo = (Struct) partitionInfos[j];
+                    int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+                    int leader = partitionInfo.getInt(LEADER_KEY_NAME);
+                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                    Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+                    Node[] replicaNodes = new Node[replicas.length];
+                    for (int k = 0; k < replicas.length; k++)
+                        replicaNodes[k] = brokers.get(replicas[k]);
+                    Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
+                    Node[] isrNodes = new Node[isr.length];
+                    for (int k = 0; k < isr.length; k++)
+                        isrNodes[k] = brokers.get(isr[k]);
+                    partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
+                }
+            } else {
+                errors.put(topic, Errors.forCode(topicError));
+            }
+        }
+        this.errors = errors;
+        this.cluster = new Cluster(brokers.values(), partitions);
+    }
+
+    public Map<String, Errors> errors() {
+        return this.errors;
+    }
+
+    public Cluster cluster() {
+        return this.cluster;
+    }
+
+    public static MetadataResponse parse(ByteBuffer buffer) {
+        return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
new file mode 100644
index 0000000..b33d2c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetCommitRequest.
+ */
+public class OffsetCommitRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
+
+    // topic level field names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level field names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+    private static final String METADATA_KEY_NAME = "metadata";
+
+    @Deprecated
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
+
+    // default values for the current version
+    public static final int DEFAULT_GENERATION_ID = -1;
+    public static final String DEFAULT_CONSUMER_ID = "";
+    public static final long DEFAULT_RETENTION_TIME = -1L;
+
+    // default values for old versions,
+    // will be removed after these versions are deprecated
+    @Deprecated
+    public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
+
+    private final String groupId;
+    private final String consumerId;
+    private final int generationId;
+    private final long retentionTime;
+    private final Map<TopicPartition, PartitionData> offsetData;
+
+    public static final class PartitionData {
+        @Deprecated
+        public final long timestamp;                // for V1
+
+        public final long offset;
+        public final String metadata;
+
+        @Deprecated
+        public PartitionData(long offset, long timestamp, String metadata) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.metadata = metadata;
+        }
+
+        public PartitionData(long offset, String metadata) {
+            this(offset, DEFAULT_TIMESTAMP, metadata);
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     * @param groupId
+     * @param offsetData
+     */
+    @Deprecated
+    public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+
+        initCommonFields(groupId, offsetData);
+        this.groupId = groupId;
+        this.generationId = DEFAULT_GENERATION_ID;
+        this.consumerId = DEFAULT_CONSUMER_ID;
+        this.retentionTime = DEFAULT_RETENTION_TIME;
+        this.offsetData = offsetData;
+    }
+
+    /**
+     * Constructor for version 1.
+     * @param groupId
+     * @param generationId
+     * @param consumerId
+     * @param offsetData
+     */
+    @Deprecated
+    public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
+
+        initCommonFields(groupId, offsetData);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.retentionTime = DEFAULT_RETENTION_TIME;
+        this.offsetData = offsetData;
+    }
+
+    /**
+     * Constructor for version 2.
+     * @param groupId
+     * @param generationId
+     * @param consumerId
+     * @param retentionTime
+     * @param offsetData
+     */
+    public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        initCommonFields(groupId, offsetData);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.retentionTime = retentionTime;
+        this.offsetData = offsetData;
+    }
+
+    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        List<Struct> topicArray = new ArrayList<Struct>();
+
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                // Only for v1
+                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
+                    partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+    }
+
+    public OffsetCommitRequest(Struct struct) {
+        super(struct);
+
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        // This field only exists in v1.
+        if (struct.hasField(GENERATION_ID_KEY_NAME))
+            generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        else
+            generationId = DEFAULT_GENERATION_ID;
+
+        // This field only exists in v1.
+        if (struct.hasField(CONSUMER_ID_KEY_NAME))
+            consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        else
+            consumerId = DEFAULT_CONSUMER_ID;
+
+        // This field only exists in v2
+        if (struct.hasField(RETENTION_TIME_KEY_NAME))
+            retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
+        else
+            retentionTime = DEFAULT_RETENTION_TIME;
+
+        offsetData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicData = (Struct) topicDataObj;
+            String topic = topicData.getString(TOPIC_KEY_NAME);
+            for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionDataStruct = (Struct) partitionDataObj;
+                int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
+                long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
+                String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
+                PartitionData partitionOffset;
+                // This field only exists in v1
+                if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
+                    long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
+                    partitionOffset = new PartitionData(offset, timestamp, metadata);
+                } else {
+                    partitionOffset = new PartitionData(offset, metadata);
+                }
+                offsetData.put(new TopicPartition(topic, partition), partitionOffset);
+            }
+        }
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
+            responseData.put(entry.getKey(), Errors.forException(e).code());
+        }
+
+        switch (versionId) {
+            // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
+            case 0:
+            case 1:
+            case 2:
+                return new OffsetCommitResponse(responseData);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public long retentionTime() {
+        return retentionTime;
+    }
+
+    public Map<TopicPartition, PartitionData> offsetData() {
+        return offsetData;
+    }
+
+    public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
+        Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
+        return new OffsetCommitRequest((Struct) schema.read(buffer));
+    }
+
+    public static OffsetCommitRequest parse(ByteBuffer buffer) {
+        return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
new file mode 100644
index 0000000..5f14b63
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class OffsetCommitResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+    private static final String RESPONSES_KEY_NAME = "responses";
+
+    // topic level fields
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+    // partition level fields
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * OFFSET_METADATA_TOO_LARGE (12)
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * ILLEGAL_GENERATION (22)
+     * UNKNOWN_CONSUMER_ID (25)
+     * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
+     * INVALID_COMMIT_OFFSET_SIZE (28)
+     */
+
+    private final Map<TopicPartition, Short> responseData;
+
+    public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            List<Struct> partitionArray = new ArrayList<Struct>();
+            for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+        this.responseData = responseData;
+    }
+
+    public OffsetCommitResponse(Struct struct) {
+        super(struct);
+        responseData = new HashMap<TopicPartition, Short>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                responseData.put(new TopicPartition(topic, partition), errorCode);
+            }
+        }
+    }
+
+    public Map<TopicPartition, Short> responseData() {
+        return responseData;
+    }
+
+    public static OffsetCommitResponse parse(ByteBuffer buffer) {
+        return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}