You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:56 UTC
[39/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
deleted file mode 100644
index e97ebc2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchRequest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This wrapper supports both v0 and v1 of OffsetFetchRequest.
- */
-public class OffsetFetchRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
- private static final String GROUP_ID_KEY_NAME = "group_id";
- private static final String TOPICS_KEY_NAME = "topics";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
-
- private final String groupId;
- private final List<TopicPartition> partitions;
-
- public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
- super(new Struct(CURRENT_SCHEMA));
-
- Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
-
- struct.set(GROUP_ID_KEY_NAME, groupId);
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
- Struct topicData = struct.instance(TOPICS_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entries.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Integer partiitonId : entries.getValue()) {
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partiitonId);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(TOPICS_KEY_NAME, topicArray.toArray());
- this.groupId = groupId;
- this.partitions = partitions;
- }
-
- public OffsetFetchRequest(Struct struct) {
- super(struct);
- partitions = new ArrayList<TopicPartition>();
- for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- partitions.add(new TopicPartition(topic, partition));
- }
- }
- groupId = struct.getString(GROUP_ID_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
-
- for (TopicPartition partition: partitions) {
- responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
- OffsetFetchResponse.NO_METADATA,
- Errors.forException(e).code()));
- }
-
- switch (versionId) {
- // OffsetFetchResponseV0 == OffsetFetchResponseV1
- case 0:
- case 1:
- return new OffsetFetchResponse(responseData);
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
- }
- }
-
- public String groupId() {
- return groupId;
- }
-
- public List<TopicPartition> partitions() {
- return partitions;
- }
-
- public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
- return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
- }
-
- public static OffsetFetchRequest parse(ByteBuffer buffer) {
- return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
deleted file mode 100644
index a1be70f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetFetchResponse.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class OffsetFetchResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
- private static final String RESPONSES_KEY_NAME = "responses";
-
- // topic level fields
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
- // partition level fields
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String COMMIT_OFFSET_KEY_NAME = "offset";
- private static final String METADATA_KEY_NAME = "metadata";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- public static final long INVALID_OFFSET = -1L;
- public static final String NO_METADATA = "";
-
- /**
- * Possible error code:
- *
- * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
- * OFFSET_LOAD_IN_PROGRESS (14)
- * NOT_COORDINATOR_FOR_CONSUMER (16)
- * ILLEGAL_GENERATION (22)
- * UNKNOWN_CONSUMER_ID (25)
- */
-
- private final Map<TopicPartition, PartitionData> responseData;
-
- public static final class PartitionData {
- public final long offset;
- public final String metadata;
- public final short errorCode;
-
- public PartitionData(long offset, String metadata, short errorCode) {
- this.offset = offset;
- this.metadata = metadata;
- this.errorCode = errorCode;
- }
-
- public boolean hasError() {
- return this.errorCode != Errors.NONE.code();
- }
- }
-
- public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(CURRENT_SCHEMA));
-
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
- List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entries.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
- partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
- partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
- this.responseData = responseData;
- }
-
- public OffsetFetchResponse(Struct struct) {
- super(struct);
- responseData = new HashMap<TopicPartition, PartitionData>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
- String metadata = partitionResponse.getString(METADATA_KEY_NAME);
- short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
- PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- }
-
- public Map<TopicPartition, PartitionData> responseData() {
- return responseData;
- }
-
- public static OffsetFetchResponse parse(ByteBuffer buffer) {
- return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
deleted file mode 100644
index 55694fb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceRequest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ProduceRequest extends AbstractRequest {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
- private static final String ACKS_KEY_NAME = "acks";
- private static final String TIMEOUT_KEY_NAME = "timeout";
- private static final String TOPIC_DATA_KEY_NAME = "topic_data";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITION_DATA_KEY_NAME = "data";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String RECORD_SET_KEY_NAME = "record_set";
-
- private final short acks;
- private final int timeout;
- private final Map<TopicPartition, ByteBuffer> partitionRecords;
-
- public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
- struct.set(ACKS_KEY_NAME, acks);
- struct.set(TIMEOUT_KEY_NAME, timeout);
- List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
- for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
- Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
- ByteBuffer buffer = partitionEntry.getValue().duplicate();
- Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
- .set(PARTITION_KEY_NAME, partitionEntry.getKey())
- .set(RECORD_SET_KEY_NAME, buffer);
- partitionArray.add(part);
- }
- topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
- topicDatas.add(topicData);
- }
- struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
- this.acks = acks;
- this.timeout = timeout;
- this.partitionRecords = partitionRecords;
- }
-
- public ProduceRequest(Struct struct) {
- super(struct);
- partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
- for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
- Struct topicData = (Struct) topicDataObj;
- String topic = topicData.getString(TOPIC_KEY_NAME);
- for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
- partitionRecords.put(new TopicPartition(topic, partition), records);
- }
- }
- acks = struct.getShort(ACKS_KEY_NAME);
- timeout = struct.getInt(TIMEOUT_KEY_NAME);
- }
-
- @Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- /* In case the producer doesn't actually want any response */
- if (acks == 0)
- return null;
-
- Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
-
- for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
- responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
- }
-
- switch (versionId) {
- case 0:
- return new ProduceResponse(responseMap);
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
- }
- }
-
- public short acks() {
- return acks;
- }
-
- public int timeout() {
- return timeout;
- }
-
- public Map<TopicPartition, ByteBuffer> partitionRecords() {
- return partitionRecords;
- }
-
- public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
- return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
- }
-
- public static ProduceRequest parse(ByteBuffer buffer) {
- return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
deleted file mode 100644
index 0728d9a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ProduceResponse.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ProduceResponse extends AbstractRequestResponse {
-
- private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
- private static final String RESPONSES_KEY_NAME = "responses";
-
- // topic level field names
- private static final String TOPIC_KEY_NAME = "topic";
- private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
-
- // partition level field names
- private static final String PARTITION_KEY_NAME = "partition";
- private static final String ERROR_CODE_KEY_NAME = "error_code";
-
- public static final long INVALID_OFFSET = -1L;
-
- /**
- * Possible error code:
- *
- * TODO
- */
-
- private static final String BASE_OFFSET_KEY_NAME = "base_offset";
-
- private final Map<TopicPartition, PartitionResponse> responses;
-
- public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
- super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
- List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
- for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, entry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
- PartitionResponse part = partitionEntry.getValue();
- Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
- .set(PARTITION_KEY_NAME, partitionEntry.getKey())
- .set(ERROR_CODE_KEY_NAME, part.errorCode)
- .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
- partitionArray.add(partStruct);
- }
- topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
- topicDatas.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
- this.responses = responses;
- }
-
- public ProduceResponse(Struct struct) {
- super(struct);
- responses = new HashMap<TopicPartition, PartitionResponse>();
- for (Object topicResponse : struct.getArray("responses")) {
- Struct topicRespStruct = (Struct) topicResponse;
- String topic = topicRespStruct.getString("topic");
- for (Object partResponse : topicRespStruct.getArray("partition_responses")) {
- Struct partRespStruct = (Struct) partResponse;
- int partition = partRespStruct.getInt("partition");
- short errorCode = partRespStruct.getShort("error_code");
- long offset = partRespStruct.getLong("base_offset");
- TopicPartition tp = new TopicPartition(topic, partition);
- responses.put(tp, new PartitionResponse(errorCode, offset));
- }
- }
- }
-
- public Map<TopicPartition, PartitionResponse> responses() {
- return this.responses;
- }
-
- public static final class PartitionResponse {
- public short errorCode;
- public long baseOffset;
-
- public PartitionResponse(short errorCode, long baseOffset) {
- this.errorCode = errorCode;
- this.baseOffset = baseOffset;
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- b.append("error: ");
- b.append(errorCode);
- b.append(",offset: ");
- b.append(baseOffset);
- b.append('}');
- return b.toString();
- }
- }
-
- public static ProduceResponse parse(ByteBuffer buffer) {
- return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
deleted file mode 100644
index 82ef7c7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestHeader.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.Protocol;
-import org.apache.flink.kafka_backport.common.protocol.types.Field;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The header for a request in the Kafka protocol
- */
-public class RequestHeader extends AbstractRequestResponse {
-
- private static final Field API_KEY_FIELD = Protocol.REQUEST_HEADER.get("api_key");
- private static final Field API_VERSION_FIELD = Protocol.REQUEST_HEADER.get("api_version");
- private static final Field CLIENT_ID_FIELD = Protocol.REQUEST_HEADER.get("client_id");
- private static final Field CORRELATION_ID_FIELD = Protocol.REQUEST_HEADER.get("correlation_id");
-
- private final short apiKey;
- private final short apiVersion;
- private final String clientId;
- private final int correlationId;
-
- public RequestHeader(Struct header) {
- super(header);
- apiKey = struct.getShort(API_KEY_FIELD);
- apiVersion = struct.getShort(API_VERSION_FIELD);
- clientId = struct.getString(CLIENT_ID_FIELD);
- correlationId = struct.getInt(CORRELATION_ID_FIELD);
- }
-
- public RequestHeader(short apiKey, String client, int correlation) {
- this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
- }
-
- public RequestHeader(short apiKey, short version, String client, int correlation) {
- super(new Struct(Protocol.REQUEST_HEADER));
- struct.set(API_KEY_FIELD, apiKey);
- struct.set(API_VERSION_FIELD, version);
- struct.set(CLIENT_ID_FIELD, client);
- struct.set(CORRELATION_ID_FIELD, correlation);
- this.apiKey = apiKey;
- this.apiVersion = version;
- this.clientId = client;
- this.correlationId = correlation;
- }
-
- public short apiKey() {
- return apiKey;
- }
-
- public short apiVersion() {
- return apiVersion;
- }
-
- public String clientId() {
- return clientId;
- }
-
- public int correlationId() {
- return correlationId;
- }
-
- public static RequestHeader parse(ByteBuffer buffer) {
- return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
deleted file mode 100644
index 1815005..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/RequestSend.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.network.NetworkSend;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
- private final RequestHeader header;
- private final Struct body;
-
- public RequestSend(String destination, RequestHeader header, Struct body) {
- super(destination, serialize(header, body));
- this.header = header;
- this.body = body;
- }
-
- private static ByteBuffer serialize(RequestHeader header, Struct body) {
- ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
- header.writeTo(buffer);
- body.writeTo(buffer);
- buffer.rewind();
- return buffer;
- }
-
- public RequestHeader header() {
- return this.header;
- }
-
- public Struct body() {
- return body;
- }
-
- @Override
- public String toString() {
- return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
deleted file mode 100644
index a6aad5e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseHeader.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.Protocol;
-import org.apache.flink.kafka_backport.common.protocol.types.Field;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A response header in the kafka protocol.
- */
-public class ResponseHeader extends AbstractRequestResponse {
-
- private static final Field CORRELATION_KEY_FIELD = Protocol.RESPONSE_HEADER.get("correlation_id");
-
- private final int correlationId;
-
- public ResponseHeader(Struct header) {
- super(header);
- correlationId = struct.getInt(CORRELATION_KEY_FIELD);
- }
-
- public ResponseHeader(int correlationId) {
- super(new Struct(Protocol.RESPONSE_HEADER));
- struct.set(CORRELATION_KEY_FIELD, correlationId);
- this.correlationId = correlationId;
- }
-
- public int correlationId() {
- return correlationId;
- }
-
- public static ResponseHeader parse(ByteBuffer buffer) {
- return new ResponseHeader((Struct) Protocol.RESPONSE_HEADER.read(buffer));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
deleted file mode 100644
index ee3b393..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ResponseSend.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.network.NetworkSend;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ResponseSend extends NetworkSend {
-
- public ResponseSend(String destination, ResponseHeader header, Struct body) {
- super(destination, serialize(header, body));
- }
-
- public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
- this(destination, header, response.toStruct());
- }
-
- private static ByteBuffer serialize(ResponseHeader header, Struct body) {
- ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
- header.writeTo(buffer);
- body.writeTo(buffer);
- buffer.rewind();
- return buffer;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
deleted file mode 100644
index 7e0b3e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArrayDeserializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.serialization;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ByteArrayDeserializer implements Deserializer<byte[]> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
- @Override
- public byte[] deserialize(String topic, byte[] data) {
- return data;
- }
-
- @Override
- public void close() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
deleted file mode 100644
index f835375..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/ByteArraySerializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.serialization;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ByteArraySerializer implements Serializer<byte[]> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
- @Override
- public byte[] serialize(String topic, byte[] data) {
- return data;
- }
-
- @Override
- public void close() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
deleted file mode 100644
index 4d2de90..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Deserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.serialization;
-
-import java.io.Closeable;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- *
- * @param <T> Type to be deserialized into.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Deserializer<T> extends Closeable {
-
- /**
- * Configure this class.
- * @param configs configs in key/value pairs
- * @param isKey whether is for key or value
- */
- public void configure(Map<String, ?> configs, boolean isKey);
-
- /**
- *
- * @param topic topic associated with the data
- * @param data serialized bytes
- * @return deserialized typed data
- */
- public T deserialize(String topic, byte[] data);
-
- @Override
- public void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
deleted file mode 100644
index c5833d5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.serialization;
-
-import org.apache.flink.kafka_backport.common.errors.SerializationException;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class IntegerDeserializer implements Deserializer<Integer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
- public Integer deserialize(String topic, byte[] data) {
- if (data == null)
- return null;
- if (data.length != 4) {
- throw new SerializationException("Size of data received by IntegerDeserializer is " +
- "not 4");
- }
-
- int value = 0;
- for (byte b : data) {
- value <<= 8;
- value |= b & 0xFF;
- }
- return value;
- }
-
- public void close() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
deleted file mode 100644
index dcbd7be..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/IntegerSerializer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.serialization;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class IntegerSerializer implements Serializer<Integer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
- public byte[] serialize(String topic, Integer data) {
- if (data == null)
- return null;
-
- return new byte[] {
- (byte) (data >>> 24),
- (byte) (data >>> 16),
- (byte) (data >>> 8),
- data.byteValue()
- };
- }
-
- public void close() {
- // nothing to do
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
deleted file mode 100644
index 1725e36..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/Serializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.serialization;
-
-import java.io.Closeable;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- *
- * @param <T> Type to be serialized from.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Serializer<T> extends Closeable {
-
- /**
- * Configure this class.
- * @param configs configs in key/value pairs
- * @param isKey whether is for key or value
- */
- public void configure(Map<String, ?> configs, boolean isKey);
-
- /**
- * @param topic topic associated with data
- * @param data typed data
- * @return serialized bytes
- */
- public byte[] serialize(String topic, T data);
-
-
- /**
- * Close this serializer.
- * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
- * multiple times.
- */
- @Override
- public void close();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
deleted file mode 100644
index 4d2ed4c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringDeserializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.serialization;
-
-import org.apache.flink.kafka_backport.common.errors.SerializationException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
- * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
- */
-public class StringDeserializer implements Deserializer<String> {
- private String encoding = "UTF8";
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
- Object encodingValue = configs.get(propertyName);
- if (encodingValue == null)
- encodingValue = configs.get("deserializer.encoding");
- if (encodingValue != null && encodingValue instanceof String)
- encoding = (String) encodingValue;
- }
-
- @Override
- public String deserialize(String topic, byte[] data) {
- try {
- if (data == null)
- return null;
- else
- return new String(data, encoding);
- } catch (UnsupportedEncodingException e) {
- throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
- }
- }
-
- @Override
- public void close() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
deleted file mode 100644
index fae4c21..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/serialization/StringSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.serialization;
-
-import org.apache.flink.kafka_backport.common.errors.SerializationException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
- * value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
- */
-public class StringSerializer implements Serializer<String> {
- private String encoding = "UTF8";
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
- Object encodingValue = configs.get(propertyName);
- if (encodingValue == null)
- encodingValue = configs.get("serializer.encoding");
- if (encodingValue != null && encodingValue instanceof String)
- encoding = (String) encodingValue;
- }
-
- @Override
- public byte[] serialize(String topic, String data) {
- try {
- if (data == null)
- return null;
- else
- return data.getBytes(encoding);
- } catch (UnsupportedEncodingException e) {
- throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
- }
- }
-
- @Override
- public void close() {
- // nothing to do
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
deleted file mode 100644
index 2af94b6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/AbstractIterator.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.utils;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A base class that simplifies implementing an iterator
- * @param <T> The type of thing we are iterating over
- */
-public abstract class AbstractIterator<T> implements Iterator<T> {
-
- private static enum State {
- READY, NOT_READY, DONE, FAILED
- };
-
- private State state = State.NOT_READY;
- private T next;
-
- @Override
- public boolean hasNext() {
- switch (state) {
- case FAILED:
- throw new IllegalStateException("Iterator is in failed state");
- case DONE:
- return false;
- case READY:
- return true;
- default:
- return maybeComputeNext();
- }
- }
-
- @Override
- public T next() {
- if (!hasNext())
- throw new NoSuchElementException();
- state = State.NOT_READY;
- if (next == null)
- throw new IllegalStateException("Expected item but none found.");
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Removal not supported");
- }
-
- public T peek() {
- if (!hasNext())
- throw new NoSuchElementException();
- return next;
- }
-
- protected T allDone() {
- state = State.DONE;
- return null;
- }
-
- protected abstract T makeNext();
-
- private Boolean maybeComputeNext() {
- state = State.FAILED;
- next = makeNext();
- if (state == State.DONE) {
- return false;
- } else {
- state = State.READY;
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
deleted file mode 100644
index 7960331..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CollectionUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.utils;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class CollectionUtils {
- /**
- * group data by topic
- * @param data Data to be partitioned
- * @param <T> Partition data type
- * @return partitioned data
- */
- public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, T> data) {
- Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, Map<Integer, T>>();
- for (Map.Entry<TopicPartition, T> entry: data.entrySet()) {
- String topic = entry.getKey().topic();
- int partition = entry.getKey().partition();
- Map<Integer, T> topicData = dataByTopic.get(topic);
- if (topicData == null) {
- topicData = new HashMap<Integer, T>();
- dataByTopic.put(topic, topicData);
- }
- topicData.put(partition, entry.getValue());
- }
- return dataByTopic;
- }
-
- /**
- * group partitions by topic
- * @param partitions
- * @return partitions per topic
- */
- public static Map<String, List<Integer>> groupDataByTopic(List<TopicPartition> partitions) {
- Map<String, List<Integer>> partitionsByTopic = new HashMap<String, List<Integer>>();
- for (TopicPartition tp: partitions) {
- String topic = tp.topic();
- List<Integer> topicData = partitionsByTopic.get(topic);
- if (topicData == null) {
- topicData = new ArrayList<Integer>();
- partitionsByTopic.put(topic, topicData);
- }
- topicData.add(tp.partition());
- }
- return partitionsByTopic;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
deleted file mode 100644
index dcf219a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/utils/CopyOnWriteMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.utils;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
- */
-public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
-
- private volatile Map<K, V> map;
-
- public CopyOnWriteMap() {
- this.map = Collections.emptyMap();
- }
-
- public CopyOnWriteMap(Map<K, V> map) {
- this.map = Collections.unmodifiableMap(map);
- }
-
- @Override
- public boolean containsKey(Object k) {
- return map.containsKey(k);
- }
-
- @Override
- public boolean containsValue(Object v) {
- return map.containsValue(v);
- }
-
- @Override
- public Set<Entry<K, V>> entrySet() {
- return map.entrySet();
- }
-
- @Override
- public V get(Object k) {
- return map.get(k);
- }
-
- @Override
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- @Override
- public Set<K> keySet() {
- return map.keySet();
- }
-
- @Override
- public int size() {
- return map.size();
- }
-
- @Override
- public Collection<V> values() {
- return map.values();
- }
-
- @Override
- public synchronized void clear() {
- this.map = Collections.emptyMap();
- }
-
- @Override
- public synchronized V put(K k, V v) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- V prev = copy.put(k, v);
- this.map = Collections.unmodifiableMap(copy);
- return prev;
- }
-
- @Override
- public synchronized void putAll(Map<? extends K, ? extends V> entries) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- copy.putAll(entries);
- this.map = Collections.unmodifiableMap(copy);
- }
-
- @Override
- public synchronized V remove(Object key) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- V prev = copy.remove(key);
- this.map = Collections.unmodifiableMap(copy);
- return prev;
- }
-
- @Override
- public synchronized V putIfAbsent(K k, V v) {
- if (!containsKey(k))
- return put(k, v);
- else
- return get(k);
- }
-
- @Override
- public synchronized boolean remove(Object k, Object v) {
- if (containsKey(k) && get(k).equals(v)) {
- remove(k);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public synchronized boolean replace(K k, V original, V replacement) {
- if (containsKey(k) && get(k).equals(original)) {
- put(k, replacement);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public synchronized V replace(K k, V v) {
- if (containsKey(k)) {
- return put(k, v);
- } else {
- return null;
- }
- }
-
-}