You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:30 UTC
[13/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
new file mode 100644
index 0000000..e97ebc2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This wrapper supports both v0 and v1 of OffsetFetchRequest.
+ */
+public class OffsetFetchRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+
+ private final String groupId;
+ private final List<TopicPartition> partitions;
+
+ public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
+
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
+ Struct topicData = struct.instance(TOPICS_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Integer partiitonId : entries.getValue()) {
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partiitonId);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+ this.groupId = groupId;
+ this.partitions = partitions;
+ }
+
+ public OffsetFetchRequest(Struct struct) {
+ super(struct);
+ partitions = new ArrayList<TopicPartition>();
+ for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ partitions.add(new TopicPartition(topic, partition));
+ }
+ }
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+
+ for (TopicPartition partition: partitions) {
+ responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+ OffsetFetchResponse.NO_METADATA,
+ Errors.forException(e).code()));
+ }
+
+ switch (versionId) {
+ // OffsetFetchResponseV0 == OffsetFetchResponseV1
+ case 0:
+ case 1:
+ return new OffsetFetchResponse(responseData);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public List<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
+ return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
+ }
+
+ public static OffsetFetchRequest parse(ByteBuffer buffer) {
+ return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
new file mode 100644
index 0000000..a1be70f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class OffsetFetchResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
+
+ // topic level fields
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
+
+ // partition level fields
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static final String METADATA_KEY_NAME = "metadata";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ public static final long INVALID_OFFSET = -1L;
+ public static final String NO_METADATA = "";
+
+ /**
+ * Possible error code:
+ *
+ * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
+ * OFFSET_LOAD_IN_PROGRESS (14)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * ILLEGAL_GENERATION (22)
+ * UNKNOWN_CONSUMER_ID (25)
+ */
+
+ private final Map<TopicPartition, PartitionData> responseData;
+
+ public static final class PartitionData {
+ public final long offset;
+ public final String metadata;
+ public final short errorCode;
+
+ public PartitionData(long offset, String metadata, short errorCode) {
+ this.offset = offset;
+ this.metadata = metadata;
+ this.errorCode = errorCode;
+ }
+
+ public boolean hasError() {
+ return this.errorCode != Errors.NONE.code();
+ }
+ }
+
+ public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
+
+ List<Struct> topicArray = new ArrayList<Struct>();
+ for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entries.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
+ partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
+ partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+ this.responseData = responseData;
+ }
+
+ public OffsetFetchResponse(Struct struct) {
+ super(struct);
+ responseData = new HashMap<TopicPartition, PartitionData>();
+ for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+ Struct topicResponse = (Struct) topicResponseObj;
+ String topic = topicResponse.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
+ String metadata = partitionResponse.getString(METADATA_KEY_NAME);
+ short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+ PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
+ responseData.put(new TopicPartition(topic, partition), partitionData);
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionData> responseData() {
+ return responseData;
+ }
+
+ public static OffsetFetchResponse parse(ByteBuffer buffer) {
+ return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
new file mode 100644
index 0000000..55694fb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ProduceRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
+ private static final String ACKS_KEY_NAME = "acks";
+ private static final String TIMEOUT_KEY_NAME = "timeout";
+ private static final String TOPIC_DATA_KEY_NAME = "topic_data";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_DATA_KEY_NAME = "data";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String RECORD_SET_KEY_NAME = "record_set";
+
+ private final short acks;
+ private final int timeout;
+ private final Map<TopicPartition, ByteBuffer> partitionRecords;
+
+ public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
+ struct.set(ACKS_KEY_NAME, acks);
+ struct.set(TIMEOUT_KEY_NAME, timeout);
+ List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
+ for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
+ Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
+ ByteBuffer buffer = partitionEntry.getValue().duplicate();
+ Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
+ .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+ .set(RECORD_SET_KEY_NAME, buffer);
+ partitionArray.add(part);
+ }
+ topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
+ topicDatas.add(topicData);
+ }
+ struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
+ this.acks = acks;
+ this.timeout = timeout;
+ this.partitionRecords = partitionRecords;
+ }
+
+ public ProduceRequest(Struct struct) {
+ super(struct);
+ partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
+ for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
+ Struct topicData = (Struct) topicDataObj;
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
+ Struct partitionResponse = (Struct) partitionResponseObj;
+ int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+ ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ partitionRecords.put(new TopicPartition(topic, partition), records);
+ }
+ }
+ acks = struct.getShort(ACKS_KEY_NAME);
+ timeout = struct.getInt(TIMEOUT_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ /* In case the producer doesn't actually want any response */
+ if (acks == 0)
+ return null;
+
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+
+ for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
+ responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
+ }
+
+ switch (versionId) {
+ case 0:
+ return new ProduceResponse(responseMap);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
+ }
+ }
+
+ public short acks() {
+ return acks;
+ }
+
+ public int timeout() {
+ return timeout;
+ }
+
+ public Map<TopicPartition, ByteBuffer> partitionRecords() {
+ return partitionRecords;
+ }
+
+ public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
+ return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
+ }
+
+ public static ProduceRequest parse(ByteBuffer buffer) {
+ return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
new file mode 100644
index 0000000..0728d9a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ProduceResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
+
+ // topic level field names
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
+
+ // partition level field names
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ public static final long INVALID_OFFSET = -1L;
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
+ private static final String BASE_OFFSET_KEY_NAME = "base_offset";
+
+ private final Map<TopicPartition, PartitionResponse> responses;
+
+ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
+ super(new Struct(CURRENT_SCHEMA));
+ Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
+ List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+ for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, entry.getKey());
+ List<Struct> partitionArray = new ArrayList<Struct>();
+ for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
+ PartitionResponse part = partitionEntry.getValue();
+ Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
+ .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+ .set(ERROR_CODE_KEY_NAME, part.errorCode)
+ .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+ partitionArray.add(partStruct);
+ }
+ topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
+ topicDatas.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+ this.responses = responses;
+ }
+
+ public ProduceResponse(Struct struct) {
+ super(struct);
+ responses = new HashMap<TopicPartition, PartitionResponse>();
+ for (Object topicResponse : struct.getArray("responses")) {
+ Struct topicRespStruct = (Struct) topicResponse;
+ String topic = topicRespStruct.getString("topic");
+ for (Object partResponse : topicRespStruct.getArray("partition_responses")) {
+ Struct partRespStruct = (Struct) partResponse;
+ int partition = partRespStruct.getInt("partition");
+ short errorCode = partRespStruct.getShort("error_code");
+ long offset = partRespStruct.getLong("base_offset");
+ TopicPartition tp = new TopicPartition(topic, partition);
+ responses.put(tp, new PartitionResponse(errorCode, offset));
+ }
+ }
+ }
+
+ public Map<TopicPartition, PartitionResponse> responses() {
+ return this.responses;
+ }
+
+ public static final class PartitionResponse {
+ public short errorCode;
+ public long baseOffset;
+
+ public PartitionResponse(short errorCode, long baseOffset) {
+ this.errorCode = errorCode;
+ this.baseOffset = baseOffset;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ b.append("error: ");
+ b.append(errorCode);
+ b.append(",offset: ");
+ b.append(baseOffset);
+ b.append('}');
+ return b.toString();
+ }
+ }
+
+ public static ProduceResponse parse(ByteBuffer buffer) {
+ return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
new file mode 100644
index 0000000..82ef7c7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.Protocol;
+import org.apache.flink.kafka_backport.common.protocol.types.Field;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The header for a request in the Kafka protocol
+ */
+public class RequestHeader extends AbstractRequestResponse {
+
+ private static final Field API_KEY_FIELD = Protocol.REQUEST_HEADER.get("api_key");
+ private static final Field API_VERSION_FIELD = Protocol.REQUEST_HEADER.get("api_version");
+ private static final Field CLIENT_ID_FIELD = Protocol.REQUEST_HEADER.get("client_id");
+ private static final Field CORRELATION_ID_FIELD = Protocol.REQUEST_HEADER.get("correlation_id");
+
+ private final short apiKey;
+ private final short apiVersion;
+ private final String clientId;
+ private final int correlationId;
+
+ public RequestHeader(Struct header) {
+ super(header);
+ apiKey = struct.getShort(API_KEY_FIELD);
+ apiVersion = struct.getShort(API_VERSION_FIELD);
+ clientId = struct.getString(CLIENT_ID_FIELD);
+ correlationId = struct.getInt(CORRELATION_ID_FIELD);
+ }
+
+ public RequestHeader(short apiKey, String client, int correlation) {
+ this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
+ }
+
+ public RequestHeader(short apiKey, short version, String client, int correlation) {
+ super(new Struct(Protocol.REQUEST_HEADER));
+ struct.set(API_KEY_FIELD, apiKey);
+ struct.set(API_VERSION_FIELD, version);
+ struct.set(CLIENT_ID_FIELD, client);
+ struct.set(CORRELATION_ID_FIELD, correlation);
+ this.apiKey = apiKey;
+ this.apiVersion = version;
+ this.clientId = client;
+ this.correlationId = correlation;
+ }
+
+ public short apiKey() {
+ return apiKey;
+ }
+
+ public short apiVersion() {
+ return apiVersion;
+ }
+
+ public String clientId() {
+ return clientId;
+ }
+
+ public int correlationId() {
+ return correlationId;
+ }
+
+ public static RequestHeader parse(ByteBuffer buffer) {
+ return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
new file mode 100644
index 0000000..1815005
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.network.NetworkSend;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A send object for a kafka request
+ */
+public class RequestSend extends NetworkSend {
+
+ private final RequestHeader header;
+ private final Struct body;
+
+ public RequestSend(String destination, RequestHeader header, Struct body) {
+ super(destination, serialize(header, body));
+ this.header = header;
+ this.body = body;
+ }
+
+ private static ByteBuffer serialize(RequestHeader header, Struct body) {
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+ header.writeTo(buffer);
+ body.writeTo(buffer);
+ buffer.rewind();
+ return buffer;
+ }
+
+ public RequestHeader header() {
+ return this.header;
+ }
+
+ public Struct body() {
+ return body;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
new file mode 100644
index 0000000..a6aad5e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.protocol.Protocol;
+import org.apache.flink.kafka_backport.common.protocol.types.Field;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A response header in the kafka protocol.
+ */
+public class ResponseHeader extends AbstractRequestResponse {
+
+ private static final Field CORRELATION_KEY_FIELD = Protocol.RESPONSE_HEADER.get("correlation_id");
+
+ private final int correlationId;
+
+ public ResponseHeader(Struct header) {
+ super(header);
+ correlationId = struct.getInt(CORRELATION_KEY_FIELD);
+ }
+
+ public ResponseHeader(int correlationId) {
+ super(new Struct(Protocol.RESPONSE_HEADER));
+ struct.set(CORRELATION_KEY_FIELD, correlationId);
+ this.correlationId = correlationId;
+ }
+
+ public int correlationId() {
+ return correlationId;
+ }
+
+ public static ResponseHeader parse(ByteBuffer buffer) {
+ return new ResponseHeader((Struct) Protocol.RESPONSE_HEADER.read(buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
new file mode 100644
index 0000000..ee3b393
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.requests;
+
+import org.apache.flink.kafka_backport.common.network.NetworkSend;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ResponseSend extends NetworkSend {
+
+ public ResponseSend(String destination, ResponseHeader header, Struct body) {
+ super(destination, serialize(header, body));
+ }
+
+ public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
+ this(destination, header, response.toStruct());
+ }
+
+ private static ByteBuffer serialize(ResponseHeader header, Struct body) {
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+ header.writeTo(buffer);
+ body.writeTo(buffer);
+ buffer.rewind();
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
new file mode 100644
index 0000000..7e0b3e9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.serialization;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ByteArrayDeserializer implements Deserializer<byte[]> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data) {
+ return data;
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
new file mode 100644
index 0000000..f835375
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.serialization;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ByteArraySerializer implements Serializer<byte[]> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public byte[] serialize(String topic, byte[] data) {
+ return data;
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
new file mode 100644
index 0000000..4d2de90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.serialization;
+
+import java.io.Closeable;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ *
+ * @param <T> Type to be deserialized into.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Deserializer<T> extends Closeable {
+
+ /**
+ * Configure this class.
+ * @param configs configs in key/value pairs
+ * @param isKey whether is for key or value
+ */
+ public void configure(Map<String, ?> configs, boolean isKey);
+
+ /**
+ *
+ * @param topic topic associated with the data
+ * @param data serialized bytes
+ * @return deserialized typed data
+ */
+ public T deserialize(String topic, byte[] data);
+
+ @Override
+ public void close();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
new file mode 100644
index 0000000..c5833d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.serialization;
+
+import org.apache.flink.kafka_backport.common.errors.SerializationException;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class IntegerDeserializer implements Deserializer<Integer> {
+
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ public Integer deserialize(String topic, byte[] data) {
+ if (data == null)
+ return null;
+ if (data.length != 4) {
+ throw new SerializationException("Size of data received by IntegerDeserializer is " +
+ "not 4");
+ }
+
+ int value = 0;
+ for (byte b : data) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return value;
+ }
+
+ public void close() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
new file mode 100644
index 0000000..dcbd7be
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.serialization;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class IntegerSerializer implements Serializer<Integer> {
+
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ public byte[] serialize(String topic, Integer data) {
+ if (data == null)
+ return null;
+
+ return new byte[] {
+ (byte) (data >>> 24),
+ (byte) (data >>> 16),
+ (byte) (data >>> 8),
+ data.byteValue()
+ };
+ }
+
+ public void close() {
+ // nothing to do
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
new file mode 100644
index 0000000..1725e36
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.serialization;
+
+import java.io.Closeable;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ *
+ * @param <T> Type to be serialized from.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Serializer<T> extends Closeable {
+
+ /**
+ * Configure this class.
+ * @param configs configs in key/value pairs
+ * @param isKey whether is for key or value
+ */
+ public void configure(Map<String, ?> configs, boolean isKey);
+
+ /**
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized bytes
+ */
+ public byte[] serialize(String topic, T data);
+
+
+ /**
+ * Close this serializer.
+ * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
+ * multiple times.
+ */
+ @Override
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
new file mode 100644
index 0000000..4d2ed4c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.serialization;
+
+import org.apache.flink.kafka_backport.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
+ * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
+ */
+public class StringDeserializer implements Deserializer<String> {
+ private String encoding = "UTF8";
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+ Object encodingValue = configs.get(propertyName);
+ if (encodingValue == null)
+ encodingValue = configs.get("deserializer.encoding");
+ if (encodingValue != null && encodingValue instanceof String)
+ encoding = (String) encodingValue;
+ }
+
+ @Override
+ public String deserialize(String topic, byte[] data) {
+ try {
+ if (data == null)
+ return null;
+ else
+ return new String(data, encoding);
+ } catch (UnsupportedEncodingException e) {
+ throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
new file mode 100644
index 0000000..fae4c21
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.serialization;
+
+import org.apache.flink.kafka_backport.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
+ * value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
+ */
+public class StringSerializer implements Serializer<String> {
+ private String encoding = "UTF8";
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
+ Object encodingValue = configs.get(propertyName);
+ if (encodingValue == null)
+ encodingValue = configs.get("serializer.encoding");
+ if (encodingValue != null && encodingValue instanceof String)
+ encoding = (String) encodingValue;
+ }
+
+ @Override
+ public byte[] serialize(String topic, String data) {
+ try {
+ if (data == null)
+ return null;
+ else
+ return data.getBytes(encoding);
+ } catch (UnsupportedEncodingException e) {
+ throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
new file mode 100644
index 0000000..2af94b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.utils;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A base class that simplifies implementing an iterator
+ * @param <T> The type of thing we are iterating over
+ */
+public abstract class AbstractIterator<T> implements Iterator<T> {
+
+ private static enum State {
+ READY, NOT_READY, DONE, FAILED
+ };
+
+ private State state = State.NOT_READY;
+ private T next;
+
+ @Override
+ public boolean hasNext() {
+ switch (state) {
+ case FAILED:
+ throw new IllegalStateException("Iterator is in failed state");
+ case DONE:
+ return false;
+ case READY:
+ return true;
+ default:
+ return maybeComputeNext();
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ state = State.NOT_READY;
+ if (next == null)
+ throw new IllegalStateException("Expected item but none found.");
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removal not supported");
+ }
+
+ public T peek() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ return next;
+ }
+
+ protected T allDone() {
+ state = State.DONE;
+ return null;
+ }
+
+ protected abstract T makeNext();
+
+ private Boolean maybeComputeNext() {
+ state = State.FAILED;
+ next = makeNext();
+ if (state == State.DONE) {
+ return false;
+ } else {
+ state = State.READY;
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
new file mode 100644
index 0000000..7960331
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.utils;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class CollectionUtils {
+ /**
+ * group data by topic
+ * @param data Data to be partitioned
+ * @param <T> Partition data type
+ * @return partitioned data
+ */
+ public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, T> data) {
+ Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, Map<Integer, T>>();
+ for (Map.Entry<TopicPartition, T> entry: data.entrySet()) {
+ String topic = entry.getKey().topic();
+ int partition = entry.getKey().partition();
+ Map<Integer, T> topicData = dataByTopic.get(topic);
+ if (topicData == null) {
+ topicData = new HashMap<Integer, T>();
+ dataByTopic.put(topic, topicData);
+ }
+ topicData.put(partition, entry.getValue());
+ }
+ return dataByTopic;
+ }
+
+ /**
+ * group partitions by topic
+ * @param partitions
+ * @return partitions per topic
+ */
+ public static Map<String, List<Integer>> groupDataByTopic(List<TopicPartition> partitions) {
+ Map<String, List<Integer>> partitionsByTopic = new HashMap<String, List<Integer>>();
+ for (TopicPartition tp: partitions) {
+ String topic = tp.topic();
+ List<Integer> topicData = partitionsByTopic.get(topic);
+ if (topicData == null) {
+ topicData = new ArrayList<Integer>();
+ partitionsByTopic.put(topic, topicData);
+ }
+ topicData.add(tp.partition());
+ }
+ return partitionsByTopic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
new file mode 100644
index 0000000..dcf219a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
+ */
+public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
+
+ private volatile Map<K, V> map;
+
+ public CopyOnWriteMap() {
+ this.map = Collections.emptyMap();
+ }
+
+ public CopyOnWriteMap(Map<K, V> map) {
+ this.map = Collections.unmodifiableMap(map);
+ }
+
+ @Override
+ public boolean containsKey(Object k) {
+ return map.containsKey(k);
+ }
+
+ @Override
+ public boolean containsValue(Object v) {
+ return map.containsValue(v);
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public V get(Object k) {
+ return map.get(k);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
+ public synchronized void clear() {
+ this.map = Collections.emptyMap();
+ }
+
+ @Override
+ public synchronized V put(K k, V v) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ V prev = copy.put(k, v);
+ this.map = Collections.unmodifiableMap(copy);
+ return prev;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> entries) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ copy.putAll(entries);
+ this.map = Collections.unmodifiableMap(copy);
+ }
+
+ @Override
+ public synchronized V remove(Object key) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ V prev = copy.remove(key);
+ this.map = Collections.unmodifiableMap(copy);
+ return prev;
+ }
+
+ @Override
+ public synchronized V putIfAbsent(K k, V v) {
+ if (!containsKey(k))
+ return put(k, v);
+ else
+ return get(k);
+ }
+
+ @Override
+ public synchronized boolean remove(Object k, Object v) {
+ if (containsKey(k) && get(k).equals(v)) {
+ remove(k);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized boolean replace(K k, V original, V replacement) {
+ if (containsKey(k) && get(k).equals(original)) {
+ put(k, replacement);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized V replace(K k, V v) {
+ if (containsKey(k)) {
+ return put(k, v);
+ } else {
+ return null;
+ }
+ }
+
+}