You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:31 UTC
[14/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
new file mode 100644
index 0000000..f797ebe
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class FetchRequest extends AbstractRequest {
+
+ public static final int CONSUMER_REPLICA_ID = -1;
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+ private static final String REPLICA_ID_KEY_NAME = "replica_id";
+ private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
+ private static final String MIN_BYTES_KEY_NAME = "min_bytes";
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+ private static final String MAX_BYTES_KEY_NAME = "max_bytes";
+
+ private final int replicaId;
+ private final int maxWait;
+ private final int minBytes;
+ private final Map<TopicPartition, PartitionData> fetchData;
+
+ public static final class PartitionData {
+ public final long offset;
+ public final int maxBytes;
+
+ public PartitionData(long offset, int maxBytes) {
+ this.offset = offset;
+ this.maxBytes = maxBytes;
+ }
+ }
+
+ /**
+ * Create a non-replica fetch request
+ */
+ public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+ this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+ }
+
+ /**
+ * Create a replica fetch request
+ */
+ public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
+
+ struct.set(REPLICA_ID_KEY_NAME, replicaId);
+ struct.set(MAX_WAIT_KEY_NAME, maxWait);
+ struct.set(MIN_BYTES_KEY_NAME, minBytes);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.replicaId = replicaId;
+ this.maxWait = maxWait;
+ this.minBytes = minBytes;
+ this.fetchData = fetchData;
+ }
+
+ public FetchRequest(Struct struct) {
+ super(struct);
+ replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+ maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
+ minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
+ fetchData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
+ int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
+ PartitionData partitionData = new PartitionData(offset, maxBytes);
+ fetchData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+
+ for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
+ FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
+ FetchResponse.INVALID_HIGHWATERMARK,
+ FetchResponse.EMPTY_RECORD_SET);
+ responseData.put(entry.getKey(), partitionResponse);
+ }
+
+ switch (versionId) {
+ case 0:
+ return new FetchResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
+ }
+ }
+
+ public int replicaId() {
+ return replicaId;
+ }
+
+ public int maxWait() {
+ return maxWait;
+ }
+
+ public int minBytes() {
+ return minBytes;
+ }
+
+ public Map<TopicPartition, PartitionData> fetchData() {
+ return fetchData;
+ }
+
+ public static FetchRequest parse(ByteBuffer buffer, int versionId) {
+ return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
+ }
+
+ public static FetchRequest parse(ByteBuffer buffer) {
+ return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
new file mode 100644
index 0000000..158833e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class FetchResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * OFFSET_OUT_OF_RANGE (1)
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * REPLICA_NOT_AVAILABLE (9)
+ * UNKNOWN (-1)
+ */
+
+ private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+ private static final String RECORD_SET_KEY_NAME = "record_set";
+
+ public static final long INVALID_HIGHWATERMARK = -1L;
+ public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
+
+ private final Map<TopicPartition, PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final short errorCode;
+ public final long highWatermark;
+ public final ByteBuffer recordSet;
+
+ public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+ this.errorCode = errorCode;
+ this.highWatermark = highWatermark;
+ this.recordSet = recordSet;
+ }
+ }
+
+ public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+ partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+ partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public FetchResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
+ ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static FetchResponse parse(ByteBuffer buffer) {
+ return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
new file mode 100644
index 0000000..c8abb67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class HeartbeatRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+ private final String groupId;
+ private final int groupGenerationId;
+ private final String consumerId;
+
+ public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ this.groupId = groupId;
+ this.groupGenerationId = groupGenerationId;
+ this.consumerId = consumerId;
+ }
+
+ public HeartbeatRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new HeartbeatResponse(Errors.forException(e).code());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int groupGenerationId() {
+ return groupGenerationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
+ return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
+ }
+
+ public static HeartbeatRequest parse(ByteBuffer buffer) {
+ return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
new file mode 100644
index 0000000..4bf6669
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class HeartbeatResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * ILLEGAL_GENERATION (22)
+ * UNKNOWN_CONSUMER_ID (25)
+ */
+
+ private final short errorCode;
+ public HeartbeatResponse(short errorCode) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ this.errorCode = errorCode;
+ }
+
+ public HeartbeatResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static HeartbeatResponse parse(ByteBuffer buffer) {
+ return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
new file mode 100644
index 0000000..f098d18
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class JoinGroupRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+ private static final String TOPICS_KEY_NAME = "topics";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+
+ public static final String UNKNOWN_CONSUMER_ID = "";
+
+ private final String groupId;
+ private final int sessionTimeout;
+ private final List<String> topics;
+ private final String consumerId;
+ private final String strategy;
+
+ public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+ struct.set(TOPICS_KEY_NAME, topics.toArray());
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(STRATEGY_KEY_NAME, strategy);
+ this.groupId = groupId;
+ this.sessionTimeout = sessionTimeout;
+ this.topics = topics;
+ this.consumerId = consumerId;
+ this.strategy = strategy;
+ }
+
+ public JoinGroupRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+ Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
+ topics = new ArrayList<String>();
+ for (Object topic: topicsArray)
+ topics.add((String) topic);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ strategy = struct.getString(STRATEGY_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new JoinGroupResponse(
+ Errors.forException(e).code(),
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_CONSUMER_ID,
+ Collections.<TopicPartition>emptyList());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int sessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public List<String> topics() {
+ return topics;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public String strategy() {
+ return strategy;
+ }
+
+ public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
+ return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
+ }
+
+ public static JoinGroupRequest parse(ByteBuffer buffer) {
+ return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
new file mode 100644
index 0000000..7d9b647
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class JoinGroupResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
+ * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
+ * UNKNOWN_CONSUMER_ID (25)
+ * INVALID_SESSION_TIMEOUT (26)
+ */
+
+ private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ public static final int UNKNOWN_GENERATION_ID = -1;
+ public static final String UNKNOWN_CONSUMER_ID = "";
+
+ private final short errorCode;
+ private final int generationId;
+ private final String consumerId;
+ private final List<TopicPartition> assignedPartitions;
+
+ public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
+ Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
+
+ this.errorCode = errorCode;
+ this.generationId = generationId;
+ this.consumerId = consumerId;
+ this.assignedPartitions = assignedPartitions;
+ }
+
+ public JoinGroupResponse(Struct struct) {
+ super(struct);
+ assignedPartitions = new ArrayList<TopicPartition>();
+ for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
+ Struct topicData = (Struct) topicDataObj;
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
+ assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
+ }
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public int generationId() {
+ return generationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public List<TopicPartition> assignedPartitions() {
+ return assignedPartitions;
+ }
+
+ public static JoinGroupResponse parse(ByteBuffer buffer) {
+ return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
new file mode 100644
index 0000000..069e06d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ListOffsetRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+ private static final String REPLICA_ID_KEY_NAME = "replica_id";
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String TIMESTAMP_KEY_NAME = "timestamp";
+ private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+
+ private final int replicaId;
+ private final Map<TopicPartition, PartitionData> offsetData;
+
+ public static final class PartitionData {
+ public final long timestamp;
+ public final int maxNumOffsets;
+
+ public PartitionData(long timestamp, int maxNumOffsets) {
+ this.timestamp = timestamp;
+ this.maxNumOffsets = maxNumOffsets;
+ }
+ }
+
+ public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
+ this(-1, offsetData);
+ }
+
+ public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+ struct.set(REPLICA_ID_KEY_NAME, replicaId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData offsetPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
+ partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.replicaId = replicaId;
+ this.offsetData = offsetData;
+ }
+
+ public ListOffsetRequest(Struct struct) {
+ super(struct);
+ replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+ offsetData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+ int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
+ PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
+ offsetData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+
+ for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
+ ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
+ responseData.put(entry.getKey(), partitionResponse);
+ }
+
+ switch (versionId) {
+ case 0:
+ return new ListOffsetResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
+ }
+ }
+
+ public int replicaId() {
+ return replicaId;
+ }
+
+ public Map<TopicPartition, PartitionData> offsetData() {
+ return offsetData;
+ }
+
+ public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
+ return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
+ }
+
+ public static ListOffsetRequest parse(ByteBuffer buffer) {
+ return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
new file mode 100644
index 0000000..b831f61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ListOffsetResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * UNKNOWN (-1)
+ */
+
+ private static final String OFFSETS_KEY_NAME = "offsets";
+
+ private final Map<TopicPartition, PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final short errorCode;
+ public final List<Long> offsets;
+
+ public PartitionData(short errorCode, List<Long> offsets) {
+ this.errorCode = errorCode;
+ this.offsets = offsets;
+ }
+ }
+
+ public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData offsetPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
+ partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public ListOffsetResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
+ List<Long> offsetsList = new ArrayList<Long>();
+ for (Object offset: offsets)
+ offsetsList.add((Long) offset);
+ PartitionData partitionData = new PartitionData(errorCode, offsetsList);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static ListOffsetResponse parse(ByteBuffer buffer) {
+ return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
new file mode 100644
index 0000000..2820fcd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class MetadataRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ private final List<String> topics;
+
+ public MetadataRequest(List<String> topics) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(TOPICS_KEY_NAME, topics.toArray());
+ this.topics = topics;
+ }
+
+ public MetadataRequest(Struct struct) {
+ super(struct);
+ Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
+ topics = new ArrayList<String>();
+ for (Object topicObj: topicArray) {
+ topics.add((String) topicObj);
+ }
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<String, Errors> topicErrors = new HashMap<String, Errors>();
+ for (String topic : topics) {
+ topicErrors.put(topic, Errors.forException(e));
+ }
+
+ Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+ switch (versionId) {
+ case 0:
+ return new MetadataResponse(cluster, topicErrors);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
+ }
+ }
+
+ public List<String> topics() {
+ return topics;
+ }
+
+ public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
+ return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
+ }
+
+ public static MetadataRequest parse(ByteBuffer buffer) {
+ return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
new file mode 100644
index 0000000..83d7290
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class MetadataResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+ private static final String BROKERS_KEY_NAME = "brokers";
+ private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+
+ // broker level field names
+ private static final String NODE_ID_KEY_NAME = "node_id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
+
+ // topic level field names
+ private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+
+ // partition level field names
+ private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
+ private static final String PARTITION_KEY_NAME = "partition_id";
+ private static final String LEADER_KEY_NAME = "leader";
+ private static final String REPLICAS_KEY_NAME = "replicas";
+ private static final String ISR_KEY_NAME = "isr";
+
+ private final Cluster cluster;
+ private final Map<String, Errors> errors;
+
+ /**
+ * Constructor for MetadataResponse where there are errors for some of the topics,
+ * error data take precedence over cluster information for particular topic
+ */
+ public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ List<Struct> brokerArray = new ArrayList<Struct>();
+ for (Node node : cluster.nodes()) {
+ Struct broker = struct.instance(BROKERS_KEY_NAME);
+ broker.set(NODE_ID_KEY_NAME, node.id());
+ broker.set(HOST_KEY_NAME, node.host());
+ broker.set(PORT_KEY_NAME, node.port());
+ brokerArray.add(broker);
+ }
+ struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (String topic : cluster.topics()) {
+ Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+
+ topicData.set(TOPIC_KEY_NAME, topic);
+ if (errors.containsKey(topic)) {
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
+ } else {
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
+ Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+ partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
+ partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
+ partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
+ ArrayList<Integer> replicas = new ArrayList<Integer>();
+ for (Node node : fetchPartitionData.replicas())
+ replicas.add(node.id());
+ partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+ ArrayList<Integer> isr = new ArrayList<Integer>();
+ for (Node node : fetchPartitionData.inSyncReplicas())
+ isr.add(node.id());
+ partitionData.set(ISR_KEY_NAME, isr.toArray());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+ }
+
+ topicArray.add(topicData);
+ }
+ struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
+
+ this.cluster = cluster;
+ this.errors = new HashMap<String, Errors>();
+ }
+
+ public MetadataResponse(Struct struct) {
+ super(struct);
+ Map<String, Errors> errors = new HashMap<String, Errors>();
+ Map<Integer, Node> brokers = new HashMap<Integer, Node>();
+ Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
+ for (int i = 0; i < brokerStructs.length; i++) {
+ Struct broker = (Struct) brokerStructs[i];
+ int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+ String host = broker.getString(HOST_KEY_NAME);
+ int port = broker.getInt(PORT_KEY_NAME);
+ brokers.put(nodeId, new Node(nodeId, host, port));
+ }
+ List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+ Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
+ for (int i = 0; i < topicInfos.length; i++) {
+ Struct topicInfo = (Struct) topicInfos[i];
+ short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
+ String topic = topicInfo.getString(TOPIC_KEY_NAME);
+ if (topicError == Errors.NONE.code()) {
+ Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
+ for (int j = 0; j < partitionInfos.length; j++) {
+ Struct partitionInfo = (Struct) partitionInfos[j];
+ int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+ int leader = partitionInfo.getInt(LEADER_KEY_NAME);
+ Node leaderNode = leader == -1 ? null : brokers.get(leader);
+ Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+ Node[] replicaNodes = new Node[replicas.length];
+ for (int k = 0; k < replicas.length; k++)
+ replicaNodes[k] = brokers.get(replicas[k]);
+ Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
+ Node[] isrNodes = new Node[isr.length];
+ for (int k = 0; k < isr.length; k++)
+ isrNodes[k] = brokers.get(isr[k]);
+ partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
+ }
+ } else {
+ errors.put(topic, Errors.forCode(topicError));
+ }
+ }
+ this.errors = errors;
+ this.cluster = new Cluster(brokers.values(), partitions);
+ }
+
+ public Map<String, Errors> errors() {
+ return this.errors;
+ }
+
+ public Cluster cluster() {
+ return this.cluster;
+ }
+
+ public static MetadataResponse parse(ByteBuffer buffer) {
+ return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
new file mode 100644
index 0000000..b33d2c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetCommitRequest.
+ */
+public class OffsetCommitRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String TOPICS_KEY_NAME = "topics";
+ private static final String RETENTION_TIME_KEY_NAME = "retention_time";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static final String METADATA_KEY_NAME = "metadata";
+
+ @Deprecated
+ private static final String TIMESTAMP_KEY_NAME = "timestamp"; // for v0, v1
+
+ // default values for the current version
+ public static final int DEFAULT_GENERATION_ID = -1;
+ public static final String DEFAULT_CONSUMER_ID = "";
+ public static final long DEFAULT_RETENTION_TIME = -1L;
+
+ // default values for old versions,
+ // will be removed after these versions are deprecated
+ @Deprecated
+ public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
+
+ private final String groupId;
+ private final String consumerId;
+ private final int generationId;
+ private final long retentionTime;
+ private final Map<TopicPartition, PartitionData> offsetData;
+
+ public static final class PartitionData {
+ @Deprecated
+ public final long timestamp; // for V1
+
+ public final long offset;
+ public final String metadata;
+
+ @Deprecated
+ public PartitionData(long offset, long timestamp, String metadata) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.metadata = metadata;
+ }
+
+ public PartitionData(long offset, String metadata) {
+ this(offset, DEFAULT_TIMESTAMP, metadata);
+ }
+ }
+
+ /**
+ * Constructor for version 0.
+ * @param groupId
+ * @param offsetData
+ */
+ @Deprecated
+ public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+
+ initCommonFields(groupId, offsetData);
+ this.groupId = groupId;
+ this.generationId = DEFAULT_GENERATION_ID;
+ this.consumerId = DEFAULT_CONSUMER_ID;
+ this.retentionTime = DEFAULT_RETENTION_TIME;
+ this.offsetData = offsetData;
+ }
+
+ /**
+ * Constructor for version 1.
+ * @param groupId
+ * @param generationId
+ * @param consumerId
+ * @param offsetData
+ */
+ @Deprecated
+ public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
+
+ initCommonFields(groupId, offsetData);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ this.groupId = groupId;
+ this.generationId = generationId;
+ this.consumerId = consumerId;
+ this.retentionTime = DEFAULT_RETENTION_TIME;
+ this.offsetData = offsetData;
+ }
+
+ /**
+ * Constructor for version 2.
+ * @param groupId
+ * @param generationId
+ * @param consumerId
+ * @param retentionTime
+ * @param offsetData
+ */
+ public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ initCommonFields(groupId, offsetData);
+ struct.set(GENERATION_ID_KEY_NAME, generationId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
+ this.groupId = groupId;
+ this.generationId = generationId;
+ this.consumerId = consumerId;
+ this.retentionTime = retentionTime;
+ this.offsetData = offsetData;
+ }
+
+ private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ // Only for v1
+ if (partitionData.hasField(TIMESTAMP_KEY_NAME))
+ partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+ partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ }
+
+ public OffsetCommitRequest(Struct struct) {
+ super(struct);
+
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ // This field only exists in v1.
+ if (struct.hasField(GENERATION_ID_KEY_NAME))
+ generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+ else
+ generationId = DEFAULT_GENERATION_ID;
+
+ // This field only exists in v1.
+ if (struct.hasField(CONSUMER_ID_KEY_NAME))
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ else
+ consumerId = DEFAULT_CONSUMER_ID;
+
+ // This field only exists in v2
+ if (struct.hasField(RETENTION_TIME_KEY_NAME))
+ retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
+ else
+ retentionTime = DEFAULT_RETENTION_TIME;
+
+ offsetData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicData = (Struct) topicDataObj;
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionDataStruct = (Struct) partitionDataObj;
+ int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
+ long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
+ String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
+ PartitionData partitionOffset;
+ // This field only exists in v1
+ if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
+ long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
+ partitionOffset = new PartitionData(offset, timestamp, metadata);
+ } else {
+ partitionOffset = new PartitionData(offset, metadata);
+ }
+ offsetData.put(new TopicPartition(topic, partition), partitionOffset);
+ }
+ }
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+ for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
+ responseData.put(entry.getKey(), Errors.forException(e).code());
+ }
+
+ switch (versionId) {
+ // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
+ case 0:
+ case 1:
+ case 2:
+ return new OffsetCommitResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public int generationId() {
+ return generationId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public long retentionTime() {
+ return retentionTime;
+ }
+
+ public Map<TopicPartition, PartitionData> offsetData() {
+ return offsetData;
+ }
+
+ public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
+ Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
+ return new OffsetCommitRequest((Struct) schema.read(buffer));
+ }
+
+ public static OffsetCommitRequest parse(ByteBuffer buffer) {
+ return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
new file mode 100644
index 0000000..5f14b63
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class OffsetCommitResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
+
+ // topic level fields
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level fields
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * OFFSET_METADATA_TOO_LARGE (12)
+ * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * ILLEGAL_GENERATION (22)
+ * UNKNOWN_CONSUMER_ID (25)
+ * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
+ * INVALID_COMMIT_OFFSET_SIZE (28)
+ */
+
+ private final Map<TopicPartition, Short> responseData;
+
+ public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public OffsetCommitResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, Short>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ responseData.put(new TopicPartition(topic, partition), errorCode);
+ }
+ }
+ }
+
+ public Map<TopicPartition, Short> responseData() {
+ return responseData;
+ }
+
+ public static OffsetCommitResponse parse(ByteBuffer buffer) {
+ return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}