You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:19 UTC

[02/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataRequest.java
deleted file mode 100644
index a42333e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataRequest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Cluster;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.PartitionInfo;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    private final List<String> topics;
-
-    public MetadataRequest(List<String> topics) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        this.topics = topics;
-    }
-
-    public MetadataRequest(Struct struct) {
-        super(struct);
-        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
-        for (Object topicObj: topicArray) {
-            topics.add((String) topicObj);
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<String, Errors> topicErrors = new HashMap<String, Errors>();
-        for (String topic : topics) {
-            topicErrors.put(topic, Errors.forException(e));
-        }
-
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
-        switch (versionId) {
-            case 0:
-                return new MetadataResponse(cluster, topicErrors);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
-        }
-    }
-
-    public List<String> topics() {
-        return topics;
-    }
-
-    public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
-    }
-
-    public static MetadataRequest parse(ByteBuffer buffer) {
-        return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataResponse.java
deleted file mode 100644
index b7db4fa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/MetadataResponse.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.Cluster;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.PartitionInfo;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MetadataResponse extends AbstractRequestResponse {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
-    private static final String BROKERS_KEY_NAME = "brokers";
-    private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
-
-    // broker level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    // topic level field names
-    private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
-
-    /**
-     * Possible error code:
-     *
-     * TODO
-     */
-
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
-
-    // partition level field names
-    private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
-
-    /**
-     * Possible error code:
-     *
-     * TODO
-     */
-
-    private static final String PARTITION_KEY_NAME = "partition_id";
-    private static final String LEADER_KEY_NAME = "leader";
-    private static final String REPLICAS_KEY_NAME = "replicas";
-    private static final String ISR_KEY_NAME = "isr";
-
-    private final Cluster cluster;
-    private final Map<String, Errors> errors;
-
-    /**
-     * Constructor for MetadataResponse where there are errors for some of the topics,
-     * error data take precedence over cluster information for particular topic
-     */
-    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> brokerArray = new ArrayList<Struct>();
-        for (Node node : cluster.nodes()) {
-            Struct broker = struct.instance(BROKERS_KEY_NAME);
-            broker.set(NODE_ID_KEY_NAME, node.id());
-            broker.set(HOST_KEY_NAME, node.host());
-            broker.set(PORT_KEY_NAME, node.port());
-            brokerArray.add(broker);
-        }
-        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (String topic : cluster.topics()) {
-            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-
-            topicData.set(TOPIC_KEY_NAME, topic);
-            if (errors.containsKey(topic)) {
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
-            } else {
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                List<Struct> partitionArray = new ArrayList<Struct>();
-                for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
-                    Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
-                    partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                    partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
-                    partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
-                    ArrayList<Integer> replicas = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.replicas())
-                        replicas.add(node.id());
-                    partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
-                    ArrayList<Integer> isr = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.inSyncReplicas())
-                        isr.add(node.id());
-                    partitionData.set(ISR_KEY_NAME, isr.toArray());
-                    partitionArray.add(partitionData);
-                }
-                topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
-            }
-
-            topicArray.add(topicData);
-        }
-        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
-
-        this.cluster = cluster;
-        this.errors = new HashMap<String, Errors>();
-    }
-
-    public MetadataResponse(Struct struct) {
-        super(struct);
-        Map<String, Errors> errors = new HashMap<String, Errors>();
-        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
-        Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
-        for (int i = 0; i < brokerStructs.length; i++) {
-            Struct broker = (Struct) brokerStructs[i];
-            int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-            String host = broker.getString(HOST_KEY_NAME);
-            int port = broker.getInt(PORT_KEY_NAME);
-            brokers.put(nodeId, new Node(nodeId, host, port));
-        }
-        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
-        for (int i = 0; i < topicInfos.length; i++) {
-            Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
-            String topic = topicInfo.getString(TOPIC_KEY_NAME);
-            if (topicError == Errors.NONE.code()) {
-                Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
-                for (int j = 0; j < partitionInfos.length; j++) {
-                    Struct partitionInfo = (Struct) partitionInfos[j];
-                    int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
-                    int leader = partitionInfo.getInt(LEADER_KEY_NAME);
-                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                    Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
-                    Node[] replicaNodes = new Node[replicas.length];
-                    for (int k = 0; k < replicas.length; k++)
-                        replicaNodes[k] = brokers.get(replicas[k]);
-                    Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
-                    Node[] isrNodes = new Node[isr.length];
-                    for (int k = 0; k < isr.length; k++)
-                        isrNodes[k] = brokers.get(isr[k]);
-                    partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                }
-            } else {
-                errors.put(topic, Errors.forCode(topicError));
-            }
-        }
-        this.errors = errors;
-        this.cluster = new Cluster(brokers.values(), partitions);
-    }
-
-    public Map<String, Errors> errors() {
-        return this.errors;
-    }
-
-    public Cluster cluster() {
-        return this.cluster;
-    }
-
-    public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitRequest.java
deleted file mode 100644
index 20e518f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitRequest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This wrapper supports both v0 and v1 of OffsetCommitRequest.
- */
-public class OffsetCommitRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static final String METADATA_KEY_NAME = "metadata";
-
-    @Deprecated
-    private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
-
-    // default values for the current version
-    public static final int DEFAULT_GENERATION_ID = -1;
-    public static final String DEFAULT_CONSUMER_ID = "";
-    public static final long DEFAULT_RETENTION_TIME = -1L;
-
-    // default values for old versions,
-    // will be removed after these versions are deprecated
-    @Deprecated
-    public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
-
-    private final String groupId;
-    private final String consumerId;
-    private final int generationId;
-    private final long retentionTime;
-    private final Map<TopicPartition, PartitionData> offsetData;
-
-    public static final class PartitionData {
-        @Deprecated
-        public final long timestamp;                // for V1
-
-        public final long offset;
-        public final String metadata;
-
-        @Deprecated
-        public PartitionData(long offset, long timestamp, String metadata) {
-            this.offset = offset;
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-
-        public PartitionData(long offset, String metadata) {
-            this(offset, DEFAULT_TIMESTAMP, metadata);
-        }
-    }
-
-    /**
-     * Constructor for version 0.
-     * @param groupId
-     * @param offsetData
-     */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
-
-        initCommonFields(groupId, offsetData);
-        this.groupId = groupId;
-        this.generationId = DEFAULT_GENERATION_ID;
-        this.consumerId = DEFAULT_CONSUMER_ID;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 1.
-     * @param groupId
-     * @param generationId
-     * @param consumerId
-     * @param offsetData
-     */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
-
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 2.
-     * @param groupId
-     * @param generationId
-     * @param consumerId
-     * @param retentionTime
-     * @param offsetData
-     */
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.retentionTime = retentionTime;
-        this.offsetData = offsetData;
-    }
-
-    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                // Only for v1
-                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
-                    partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
-                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-    }
-
-    public OffsetCommitRequest(Struct struct) {
-        super(struct);
-
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        // This field only exists in v1.
-        if (struct.hasField(GENERATION_ID_KEY_NAME))
-            generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        else
-            generationId = DEFAULT_GENERATION_ID;
-
-        // This field only exists in v1.
-        if (struct.hasField(CONSUMER_ID_KEY_NAME))
-            consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-        else
-            consumerId = DEFAULT_CONSUMER_ID;
-
-        // This field only exists in v2
-        if (struct.hasField(RETENTION_TIME_KEY_NAME))
-            retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
-        else
-            retentionTime = DEFAULT_RETENTION_TIME;
-
-        offsetData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionDataStruct = (Struct) partitionDataObj;
-                int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
-                long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
-                String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
-                PartitionData partitionOffset;
-                // This field only exists in v1
-                if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
-                    long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
-                    partitionOffset = new PartitionData(offset, timestamp, metadata);
-                } else {
-                    partitionOffset = new PartitionData(offset, metadata);
-                }
-                offsetData.put(new TopicPartition(topic, partition), partitionOffset);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
-        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
-            responseData.put(entry.getKey(), Errors.forException(e).code());
-        }
-
-        switch (versionId) {
-            // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
-            case 0:
-            case 1:
-            case 2:
-                return new OffsetCommitResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public long retentionTime() {
-        return retentionTime;
-    }
-
-    public Map<TopicPartition, PartitionData> offsetData() {
-        return offsetData;
-    }
-
-    public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
-        Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
-        return new OffsetCommitRequest((Struct) schema.read(buffer));
-    }
-
-    public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitResponse.java
deleted file mode 100644
index 59f621e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetCommitResponse.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class OffsetCommitResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level fields
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level fields
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * OFFSET_METADATA_TOO_LARGE (12)
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
-     * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
-     * INVALID_COMMIT_OFFSET_SIZE (28)
-     */
-
-    private final Map<TopicPartition, Short> responseData;
-
-    public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public OffsetCommitResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, Short>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                responseData.put(new TopicPartition(topic, partition), errorCode);
-            }
-        }
-    }
-
-    public Map<TopicPartition, Short> responseData() {
-        return responseData;
-    }
-
-    public static OffsetCommitResponse parse(ByteBuffer buffer) {
-        return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchRequest.java
deleted file mode 100644
index 242d491..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchRequest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This wrapper supports both v0 and v1 of OffsetFetchRequest.
- */
-public class OffsetFetchRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-
-    private final String groupId;
-    private final List<TopicPartition> partitions;
-
-    public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Integer partiitonId : entries.getValue()) {
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partiitonId);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        this.groupId = groupId;
-        this.partitions = partitions;
-    }
-
-    public OffsetFetchRequest(Struct struct) {
-        super(struct);
-        partitions = new ArrayList<TopicPartition>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                partitions.add(new TopicPartition(topic, partition));
-            }
-        }
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
-
-        for (TopicPartition partition: partitions) {
-            responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                    OffsetFetchResponse.NO_METADATA,
-                    Errors.forException(e).code()));
-        }
-
-        switch (versionId) {
-            // OffsetFetchResponseV0 == OffsetFetchResponseV1
-            case 0:
-            case 1:
-                return new OffsetFetchResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public List<TopicPartition> partitions() {
-        return partitions;
-    }
-
-    public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer));
-    }
-
-    public static OffsetFetchRequest parse(ByteBuffer buffer) {
-        return new OffsetFetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchResponse.java
deleted file mode 100644
index 8a214a8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/OffsetFetchResponse.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class OffsetFetchResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level fields
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level fields
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static final String METADATA_KEY_NAME = "metadata";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    public static final long INVALID_OFFSET = -1L;
-    public static final String NO_METADATA = "";
-
-    /**
-     * Possible error code:
-     *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
-     *  OFFSET_LOAD_IN_PROGRESS (14)
-     *  NOT_COORDINATOR_FOR_CONSUMER (16)
-     *  ILLEGAL_GENERATION (22)
-     *  UNKNOWN_CONSUMER_ID (25)
-     */
-
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    public static final class PartitionData {
-        public final long offset;
-        public final String metadata;
-        public final short errorCode;
-
-        public PartitionData(long offset, String metadata, short errorCode) {
-            this.offset = offset;
-            this.metadata = metadata;
-            this.errorCode = errorCode;
-        }
-
-        public boolean hasError() {
-            return this.errorCode != Errors.NONE.code();
-        }
-    }
-
-    public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public OffsetFetchResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
-                String metadata = partitionResponse.getString(METADATA_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, metadata, errorCode);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
-    }
-
-    public static OffsetFetchResponse parse(ByteBuffer buffer) {
-        return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceRequest.java
deleted file mode 100644
index 7a1c995..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceRequest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ProduceRequest  extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
-    private static final String ACKS_KEY_NAME = "acks";
-    private static final String TIMEOUT_KEY_NAME = "timeout";
-    private static final String TOPIC_DATA_KEY_NAME = "topic_data";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_DATA_KEY_NAME = "data";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String RECORD_SET_KEY_NAME = "record_set";
-
-    private final short acks;
-    private final int timeout;
-    private final Map<TopicPartition, ByteBuffer> partitionRecords;
-
-    public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
-        struct.set(ACKS_KEY_NAME, acks);
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-        List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
-        for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
-            Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
-                ByteBuffer buffer = partitionEntry.getValue().duplicate();
-                Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
-                                       .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                                       .set(RECORD_SET_KEY_NAME, buffer);
-                partitionArray.add(part);
-            }
-            topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
-        this.acks = acks;
-        this.timeout = timeout;
-        this.partitionRecords = partitionRecords;
-    }
-
-    public ProduceRequest(Struct struct) {
-        super(struct);
-        partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
-        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
-                partitionRecords.put(new TopicPartition(topic, partition), records);
-            }
-        }
-        acks = struct.getShort(ACKS_KEY_NAME);
-        timeout = struct.getInt(TIMEOUT_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        /* In case the producer doesn't actually want any response */
-        if (acks == 0)
-            return null;
-
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
-
-        for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
-            responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
-        }
-
-        switch (versionId) {
-            case 0:
-                return new ProduceResponse(responseMap);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
-        }
-    }
-
-    public short acks() {
-        return acks;
-    }
-
-    public int timeout() {
-        return timeout;
-    }
-
-    public Map<TopicPartition, ByteBuffer> partitionRecords() {
-        return partitionRecords;
-    }
-
-    public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
-        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
-    }
-
-    public static ProduceRequest parse(ByteBuffer buffer) {
-        return new ProduceRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceResponse.java
deleted file mode 100644
index 47313ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ProduceResponse.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Schema;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ProduceResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    public static final long INVALID_OFFSET = -1L;
-
-    /**
-     * Possible error code:
-     *
-     * TODO
-     */
-
-    private static final String BASE_OFFSET_KEY_NAME = "base_offset";
-
-    private final Map<TopicPartition, PartitionResponse> responses;
-
-    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
-        List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
-        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
-                PartitionResponse part = partitionEntry.getValue();
-                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
-                                       .set(PARTITION_KEY_NAME, partitionEntry.getKey())
-                                       .set(ERROR_CODE_KEY_NAME, part.errorCode)
-                                       .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
-                partitionArray.add(partStruct);
-            }
-            topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
-        this.responses = responses;
-    }
-
-    public ProduceResponse(Struct struct) {
-        super(struct);
-        responses = new HashMap<TopicPartition, PartitionResponse>();
-        for (Object topicResponse : struct.getArray("responses")) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.getString("topic");
-            for (Object partResponse : topicRespStruct.getArray("partition_responses")) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.getInt("partition");
-                short errorCode = partRespStruct.getShort("error_code");
-                long offset = partRespStruct.getLong("base_offset");
-                TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(errorCode, offset));
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionResponse> responses() {
-        return this.responses;
-    }
-
-    public static final class PartitionResponse {
-        public short errorCode;
-        public long baseOffset;
-
-        public PartitionResponse(short errorCode, long baseOffset) {
-            this.errorCode = errorCode;
-            this.baseOffset = baseOffset;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder b = new StringBuilder();
-            b.append('{');
-            b.append("error: ");
-            b.append(errorCode);
-            b.append(",offset: ");
-            b.append(baseOffset);
-            b.append('}');
-            return b.toString();
-        }
-    }
-
-    public static ProduceResponse parse(ByteBuffer buffer) {
-        return new ProduceResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestHeader.java
deleted file mode 100644
index ee345c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestHeader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.Protocol;
-import org.apache.kafka.copied.common.protocol.types.Field;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-/**
- * The header for a request in the Kafka protocol
- */
-public class RequestHeader extends AbstractRequestResponse {
-
-    private static final Field API_KEY_FIELD = Protocol.REQUEST_HEADER.get("api_key");
-    private static final Field API_VERSION_FIELD = Protocol.REQUEST_HEADER.get("api_version");
-    private static final Field CLIENT_ID_FIELD = Protocol.REQUEST_HEADER.get("client_id");
-    private static final Field CORRELATION_ID_FIELD = Protocol.REQUEST_HEADER.get("correlation_id");
-
-    private final short apiKey;
-    private final short apiVersion;
-    private final String clientId;
-    private final int correlationId;
-
-    public RequestHeader(Struct header) {
-        super(header);
-        apiKey = struct.getShort(API_KEY_FIELD);
-        apiVersion = struct.getShort(API_VERSION_FIELD);
-        clientId = struct.getString(CLIENT_ID_FIELD);
-        correlationId = struct.getInt(CORRELATION_ID_FIELD);
-    }
-
-    public RequestHeader(short apiKey, String client, int correlation) {
-        this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
-    }
-
-    public RequestHeader(short apiKey, short version, String client, int correlation) {
-        super(new Struct(Protocol.REQUEST_HEADER));
-        struct.set(API_KEY_FIELD, apiKey);
-        struct.set(API_VERSION_FIELD, version);
-        struct.set(CLIENT_ID_FIELD, client);
-        struct.set(CORRELATION_ID_FIELD, correlation);
-        this.apiKey = apiKey;
-        this.apiVersion = version;
-        this.clientId = client;
-        this.correlationId = correlation;
-    }
-
-    public short apiKey() {
-        return apiKey;
-    }
-
-    public short apiVersion() {
-        return apiVersion;
-    }
-
-    public String clientId() {
-        return clientId;
-    }
-
-    public int correlationId() {
-        return correlationId;
-    }
-
-    public static RequestHeader parse(ByteBuffer buffer) {
-        return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestSend.java
deleted file mode 100644
index 0db1af6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/RequestSend.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.network.NetworkSend;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
-    private final RequestHeader header;
-    private final Struct body;
-
-    public RequestSend(String destination, RequestHeader header, Struct body) {
-        super(destination, serialize(header, body));
-        this.header = header;
-        this.body = body;
-    }
-
-    private static ByteBuffer serialize(RequestHeader header, Struct body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-
-    public RequestHeader header() {
-        return this.header;
-    }
-
-    public Struct body() {
-        return body;
-    }
-
-    @Override
-    public String toString() {
-        return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseHeader.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseHeader.java
deleted file mode 100644
index 9ded109..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseHeader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.protocol.types.Field;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.kafka.copied.common.protocol.Protocol.RESPONSE_HEADER;
-
-
-/**
- * A response header in the kafka protocol.
- */
-public class ResponseHeader extends AbstractRequestResponse {
-
-    private static final Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
-
-    private final int correlationId;
-
-    public ResponseHeader(Struct header) {
-        super(header);
-        correlationId = struct.getInt(CORRELATION_KEY_FIELD);
-    }
-
-    public ResponseHeader(int correlationId) {
-        super(new Struct(RESPONSE_HEADER));
-        struct.set(CORRELATION_KEY_FIELD, correlationId);
-        this.correlationId = correlationId;
-    }
-
-    public int correlationId() {
-        return correlationId;
-    }
-
-    public static ResponseHeader parse(ByteBuffer buffer) {
-        return new ResponseHeader((Struct) RESPONSE_HEADER.read(buffer));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseSend.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseSend.java
deleted file mode 100644
index 08363d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/requests/ResponseSend.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.copied.common.requests;
-
-import org.apache.kafka.copied.common.network.NetworkSend;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ResponseSend extends NetworkSend {
-
-    public ResponseSend(String destination, ResponseHeader header, Struct body) {
-        super(destination, serialize(header, body));
-    }
-
-    public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
-        this(destination, header, response.toStruct());
-    }
-
-    private static ByteBuffer serialize(ResponseHeader header, Struct body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArrayDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArrayDeserializer.java
deleted file mode 100644
index 276db18..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArrayDeserializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.common.serialization;
-
-import java.util.Map;
-
-public class ByteArrayDeserializer implements Deserializer<byte[]> {
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    @Override
-    public byte[] deserialize(String topic, byte[] data) {
-        return data;
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArraySerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArraySerializer.java
deleted file mode 100644
index 1a364fd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/ByteArraySerializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.common.serialization;
-
-import java.util.Map;
-
-public class ByteArraySerializer implements Serializer<byte[]> {
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    @Override
-    public byte[] serialize(String topic, byte[] data) {
-        return data;
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Deserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Deserializer.java
deleted file mode 100644
index baf56dc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Deserializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.common.serialization;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- *
- * @param <T> Type to be deserialized into.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Deserializer<T> extends Closeable {
-
-    /**
-     * Configure this class.
-     * @param configs configs in key/value pairs
-     * @param isKey whether is for key or value
-     */
-    public void configure(Map<String, ?> configs, boolean isKey);
-    
-    /**
-     *
-     * @param topic topic associated with the data
-     * @param data serialized bytes
-     * @return deserialized typed data
-     */
-    public T deserialize(String topic, byte[] data);
-
-    @Override
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerDeserializer.java
deleted file mode 100644
index f46077e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerDeserializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.serialization;
-
-import org.apache.kafka.copied.common.errors.SerializationException;
-
-import java.util.Map;
-
-public class IntegerDeserializer implements Deserializer<Integer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    public Integer deserialize(String topic, byte[] data) {
-        if (data == null)
-            return null;
-        if (data.length != 4) {
-            throw new SerializationException("Size of data received by IntegerDeserializer is " +
-                    "not 4");
-        }
-
-        int value = 0;
-        for (byte b : data) {
-            value <<= 8;
-            value |= b & 0xFF;
-        }
-        return value;
-    }
-
-    public void close() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerSerializer.java
deleted file mode 100644
index f333690..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/IntegerSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.serialization;
-
-import java.util.Map;
-
-public class IntegerSerializer implements Serializer<Integer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    public byte[] serialize(String topic, Integer data) {
-        if (data == null)
-            return null;
-
-        return new byte[] {
-            (byte) (data >>> 24),
-            (byte) (data >>> 16),
-            (byte) (data >>> 8),
-            data.byteValue()
-        };
-    }
-
-    public void close() {
-        // nothing to do
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Serializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Serializer.java
deleted file mode 100644
index 575c2e0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/Serializer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.common.serialization;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- *
- * @param <T> Type to be serialized from.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Serializer<T> extends Closeable {
-
-    /**
-     * Configure this class.
-     * @param configs configs in key/value pairs
-     * @param isKey whether is for key or value
-     */
-    public void configure(Map<String, ?> configs, boolean isKey);
-
-    /**
-     * @param topic topic associated with data
-     * @param data typed data
-     * @return serialized bytes
-     */
-    public byte[] serialize(String topic, T data);
-
-
-    /**
-     * Close this serializer.
-     * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
-     * multiple times.
-     */
-    @Override
-    public void close();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/StringDeserializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/StringDeserializer.java
deleted file mode 100644
index 2e17c9b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/common/serialization/StringDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.common.serialization;
-
-import org.apache.kafka.copied.common.errors.SerializationException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-/**
- *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
- *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
- */
-public class StringDeserializer implements Deserializer<String> {
-    private String encoding = "UTF8";
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
-        Object encodingValue = configs.get(propertyName);
-        if (encodingValue == null)
-            encodingValue = configs.get("deserializer.encoding");
-        if (encodingValue != null && encodingValue instanceof String)
-            encoding = (String) encodingValue;
-    }
-
-    @Override
-    public String deserialize(String topic, byte[] data) {
-        try {
-            if (data == null)
-                return null;
-            else
-                return new String(data, encoding);
-        } catch (UnsupportedEncodingException e) {
-            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
-        }
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}