You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:03 UTC

[46/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
deleted file mode 100644
index 90a1cfa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Listener interface to hook into RequestFuture completion.
- */
-public interface RequestFutureListener<T> {
-
-    void onSuccess(T value);
-
-    void onFailure(RuntimeException e);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index d94486e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.RetriableException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
-    public static final SendFailedException INSTANCE = new SendFailedException();
-
-    private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
deleted file mode 100644
index adff6e0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown when metadata is old and needs to be refreshed.
- */
-public class StaleMetadataException extends InvalidMetadataException {
-    private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
deleted file mode 100644
index f5e8802..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.clients.consumer.OffsetResetStrategy;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A class for tracking the topics, partitions, and offsets for the consumer
- */
-public class SubscriptionState {
-
-    /* the list of topics the user has requested */
-    private final Set<String> subscribedTopics;
-
-    /* the list of partitions the user has requested */
-    private final Set<TopicPartition> subscribedPartitions;
-
-    /* the list of partitions currently assigned */
-    private final Set<TopicPartition> assignedPartitions;
-
-    /* the offset exposed to the user */
-    private final Map<TopicPartition, Long> consumed;
-
-    /* the current point we have fetched up to */
-    private final Map<TopicPartition, Long> fetched;
-
-    /* the last committed offset for each partition */
-    private final Map<TopicPartition, Long> committed;
-
-    /* do we need to request a partition assignment from the coordinator? */
-    private boolean needsPartitionAssignment;
-
-    /* do we need to request the latest committed offsets from the coordinator? */
-    private boolean needsFetchCommittedOffsets;
-
-    /* Partitions that need to be reset before fetching */
-    private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
-
-    /* Default offset reset strategy */
-    private OffsetResetStrategy offsetResetStrategy;
-
-    public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
-        this.offsetResetStrategy = offsetResetStrategy;
-        this.subscribedTopics = new HashSet<String>();
-        this.subscribedPartitions = new HashSet<TopicPartition>();
-        this.assignedPartitions = new HashSet<TopicPartition>();
-        this.consumed = new HashMap<TopicPartition, Long>();
-        this.fetched = new HashMap<TopicPartition, Long>();
-        this.committed = new HashMap<TopicPartition, Long>();
-        this.needsPartitionAssignment = false;
-        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
-        this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
-    }
-
-    public void subscribe(String topic) {
-        if (this.subscribedPartitions.size() > 0)
-            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
-        if (!this.subscribedTopics.contains(topic)) {
-            this.subscribedTopics.add(topic);
-            this.needsPartitionAssignment = true;
-        }
-    }
-
-    public void unsubscribe(String topic) {
-        if (!this.subscribedTopics.contains(topic))
-            throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
-        this.subscribedTopics.remove(topic);
-        this.needsPartitionAssignment = true;
-        for (TopicPartition tp: assignedPartitions())
-            if (topic.equals(tp.topic()))
-                clearPartition(tp);
-    }
-
-    public void needReassignment() {
-        this.needsPartitionAssignment = true;
-    }
-
-    public void subscribe(TopicPartition tp) {
-        if (this.subscribedTopics.size() > 0)
-            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
-        this.subscribedPartitions.add(tp);
-        this.assignedPartitions.add(tp);
-    }
-
-    public void unsubscribe(TopicPartition partition) {
-        if (!subscribedPartitions.contains(partition))
-            throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
-        subscribedPartitions.remove(partition);
-        clearPartition(partition);
-    }
-    
-    private void clearPartition(TopicPartition tp) {
-        this.assignedPartitions.remove(tp);
-        this.committed.remove(tp);
-        this.fetched.remove(tp);
-        this.consumed.remove(tp);
-        this.resetPartitions.remove(tp);
-    }
-
-    public void clearAssignment() {
-        this.assignedPartitions.clear();
-        this.committed.clear();
-        this.fetched.clear();
-        this.consumed.clear();
-        this.needsPartitionAssignment = !subscribedTopics().isEmpty();
-    }
-
-    public Set<String> subscribedTopics() {
-        return this.subscribedTopics;
-    }
-
-    public Long fetched(TopicPartition tp) {
-        return this.fetched.get(tp);
-    }
-
-    public void fetched(TopicPartition tp, long offset) {
-        if (!this.assignedPartitions.contains(tp))
-            throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
-        this.fetched.put(tp, offset);
-    }
-
-    public void committed(TopicPartition tp, long offset) {
-        this.committed.put(tp, offset);
-    }
-
-    public Long committed(TopicPartition tp) {
-        return this.committed.get(tp);
-    }
-
-    public void needRefreshCommits() {
-        this.needsFetchCommittedOffsets = true;
-    }
-
-    public boolean refreshCommitsNeeded() {
-        return this.needsFetchCommittedOffsets;
-    }
-
-    public void commitsRefreshed() {
-        this.needsFetchCommittedOffsets = false;
-    }
-    
-    public void seek(TopicPartition tp, long offset) {
-        fetched(tp, offset);
-        consumed(tp, offset);
-        resetPartitions.remove(tp);
-    }
-
-    public Set<TopicPartition> assignedPartitions() {
-        return this.assignedPartitions;
-    }
-
-    public boolean partitionsAutoAssigned() {
-        return !this.subscribedTopics.isEmpty();
-    }
-
-    public void consumed(TopicPartition tp, long offset) {
-        if (!this.assignedPartitions.contains(tp))
-            throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
-        this.consumed.put(tp, offset);
-    }
-
-    public Long consumed(TopicPartition partition) {
-        return this.consumed.get(partition);
-    }
-
-    public Map<TopicPartition, Long> allConsumed() {
-        return this.consumed;
-    }
-
-    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
-        this.resetPartitions.put(partition, offsetResetStrategy);
-        this.fetched.remove(partition);
-        this.consumed.remove(partition);
-    }
-
-    public void needOffsetReset(TopicPartition partition) {
-        needOffsetReset(partition, offsetResetStrategy);
-    }
-
-    public boolean isOffsetResetNeeded(TopicPartition partition) {
-        return resetPartitions.containsKey(partition);
-    }
-
-    public boolean isOffsetResetNeeded() {
-        return !resetPartitions.isEmpty();
-    }
-
-    public OffsetResetStrategy resetStrategy(TopicPartition partition) {
-        return resetPartitions.get(partition);
-    }
-
-    public boolean hasAllFetchPositions() {
-        return this.fetched.size() >= this.assignedPartitions.size();
-    }
-
-    public Set<TopicPartition> missingFetchPositions() {
-        Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
-        copy.removeAll(this.fetched.keySet());
-        return copy;
-    }
-
-    public boolean partitionAssignmentNeeded() {
-        return this.needsPartitionAssignment;
-    }
-
-    public void changePartitionAssignment(List<TopicPartition> assignments) {
-        for (TopicPartition tp : assignments)
-            if (!this.subscribedTopics.contains(tp.topic()))
-                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
-        this.clearAssignment();
-        this.assignedPartitions.addAll(assignments);
-        this.needsPartitionAssignment = false;
-    }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
deleted file mode 100644
index f5e12d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
- */
-public final class Cluster {
-
-    private final List<Node> nodes;
-    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
-    private final Map<String, List<PartitionInfo>> partitionsByTopic;
-    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
-    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
-    private final Map<Integer, Node> nodesById;
-
-    /**
-     * Create a new cluster with the given nodes and partitions
-     * @param nodes The nodes in the cluster
-     * @param partitions Information about a subset of the topic-partitions this cluster hosts
-     */
-    public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
-        // make a randomized, unmodifiable copy of the nodes
-        List<Node> copy = new ArrayList<Node>(nodes);
-        Collections.shuffle(copy);
-        this.nodes = Collections.unmodifiableList(copy);
-        
-        this.nodesById = new HashMap<Integer, Node>();
-        for (Node node: nodes)
-            this.nodesById.put(node.id(), node);
-
-        // index the partitions by topic/partition for quick lookup
-        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
-        for (PartitionInfo p : partitions)
-            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-
-        // index the partitions by topic and node respectively, and make the lists
-        // unmodifiable so we can hand them out in user-facing apis without risk
-        // of the client modifying the contents
-        HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
-        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
-        for (Node n : this.nodes) {
-            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
-        }
-        for (PartitionInfo p : partitions) {
-            if (!partsForTopic.containsKey(p.topic()))
-                partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
-            List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
-            psTopic.add(p);
-
-            if (p.leader() != null) {
-                List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
-                psNode.add(p);
-            }
-        }
-        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
-        this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
-        for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
-            String topic = entry.getKey();
-            List<PartitionInfo> partitionList = entry.getValue();
-            this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
-            List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
-            for (PartitionInfo part : partitionList) {
-                if (part.leader() != null)
-                    availablePartitions.add(part);
-            }
-            this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
-        }
-        this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
-        for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
-            this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
-
-    }
-
-    /**
-     * Create an empty cluster instance with no nodes and no topic-partitions.
-     */
-    public static Cluster empty() {
-        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
-    }
-
-    /**
-     * Create a "bootstrap" cluster using the given list of host/ports
-     * @param addresses The addresses
-     * @return A cluster for these hosts/ports
-     */
-    public static Cluster bootstrap(List<InetSocketAddress> addresses) {
-        List<Node> nodes = new ArrayList<Node>();
-        int nodeId = -1;
-        for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
-    }
-
-    /**
-     * @return The known set of nodes
-     */
-    public List<Node> nodes() {
-        return this.nodes;
-    }
-    
-    /**
-     * Get the node by the node id (or null if no such node exists)
-     * @param id The id of the node
-     * @return The node, or null if no such node exists
-     */
-    public Node nodeById(int id) {
-        return this.nodesById.get(id);
-    }
-
-    /**
-     * Get the current leader for the given topic-partition
-     * @param topicPartition The topic and partition we want to know the leader for
-     * @return The node that is the leader for this topic-partition, or null if there is currently no leader
-     */
-    public Node leaderFor(TopicPartition topicPartition) {
-        PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
-        if (info == null)
-            return null;
-        else
-            return info.leader();
-    }
-
-    /**
-     * Get the metadata for the specified partition
-     * @param topicPartition The topic and partition to fetch info for
-     * @return The metadata about the given topic and partition
-     */
-    public PartitionInfo partition(TopicPartition topicPartition) {
-        return partitionsByTopicPartition.get(topicPartition);
-    }
-
-    /**
-     * Get the list of partitions for this topic
-     * @param topic The topic name
-     * @return A list of partitions
-     */
-    public List<PartitionInfo> partitionsForTopic(String topic) {
-        return this.partitionsByTopic.get(topic);
-    }
-
-    /**
-     * Get the list of available partitions for this topic
-     * @param topic The topic name
-     * @return A list of partitions
-     */
-    public List<PartitionInfo> availablePartitionsForTopic(String topic) {
-        return this.availablePartitionsByTopic.get(topic);
-    }
-
-    /**
-     * Get the list of partitions whose leader is this node
-     * @param nodeId The node id
-     * @return A list of partitions
-     */
-    public List<PartitionInfo> partitionsForNode(int nodeId) {
-        return this.partitionsByNode.get(nodeId);
-    }
-
-    /**
-     * Get all topics.
-     * @return a set of all topics
-     */
-    public Set<String> topics() {
-        return this.partitionsByTopic.keySet();
-    }
-
-    @Override
-    public String toString() {
-        return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
deleted file mode 100644
index fef2136..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
- */
-public interface Configurable {
-
-    /**
-     * Configure this class with the given key-value pairs
-     */
-    public void configure(Map<String, ?> configs);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
deleted file mode 100644
index d9df6e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The base class of all other Kafka exceptions
- */
-public class KafkaException extends RuntimeException {
-
-    private final static long serialVersionUID = 1L;
-
-    public KafkaException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public KafkaException(String message) {
-        super(message);
-    }
-
-    public KafkaException(Throwable cause) {
-        super(cause);
-    }
-
-    public KafkaException() {
-        super();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
deleted file mode 100644
index 8858ffe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A numerical metric tracked for monitoring purposes
- */
-public interface Metric {
-
-    /**
-     * A name for this metric
-     */
-    public MetricName metricName();
-
-    /**
-     * The value of the metric
-     */
-    public double value();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
deleted file mode 100644
index 18dd955..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
- * <p>
- * This class captures the following parameters
- * <pre>
- *  <b>name</b> The name of the metric
- *  <b>group</b> logical group name of the metrics to which this metric belongs.
- *  <b>description</b> A human-readable description to include in the metric. This is optional.
- *  <b>tags</b> additional key/value attributes of the metric. This is optional.
- * </pre>
- * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
- * <p>
- * Ex: standard JMX MBean can be constructed like  <b>domainName:type=group,key1=val1,key2=val2</b>
- * <p>
- * Usage looks something like this:
- * <pre>{@code
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- *
- * Map<String, String> metricTags = new LinkedHashMap<String, String>();
- * metricTags.put("client-id", "producer-1");
- * metricTags.put("topic", "topic");
- *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
- * sensor.add(metricName, new Avg());
- *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
- * sensor.add(metricName, new Max());
- *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
- * sensor.add(metricName, new Min());
- *
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * }</pre>
- */
-public final class MetricName {
-
-    private final String name;
-    private final String group;
-    private final String description;
-    private Map<String, String> tags;
-    private int hash = 0;
-
-    /**
-     * @param name        The name of the metric
-     * @param group       logical group name of the metrics to which this metric belongs
-     * @param description A human-readable description to include in the metric
-     * @param tags        additional key/value attributes of the metric
-     */
-    public MetricName(String name, String group, String description, Map<String, String> tags) {
-        this.name = Utils.notNull(name);
-        this.group = Utils.notNull(group);
-        this.description = Utils.notNull(description);
-        this.tags = Utils.notNull(tags);
-    }
-
-    /**
-     * @param name          The name of the metric
-     * @param group         logical group name of the metrics to which this metric belongs
-     * @param description   A human-readable description to include in the metric
-     * @param keyValue      additional key/value attributes of the metric (must come in pairs)
-     */
-    public MetricName(String name, String group, String description, String... keyValue) {
-        this(name, group, description, getTags(keyValue));
-    }
-
-    private static Map<String, String> getTags(String... keyValue) {
-        if ((keyValue.length % 2) != 0)
-            throw new IllegalArgumentException("keyValue needs to be specified in paris");
-        Map<String, String> tags = new HashMap<String, String>();
-
-        for (int i = 0; i < keyValue.length / 2; i++)
-            tags.put(keyValue[i], keyValue[i + 1]);
-        return tags;
-    }
-
-    /**
-     * @param name  The name of the metric
-     * @param group logical group name of the metrics to which this metric belongs
-     * @param tags  key/value attributes of the metric
-     */
-    public MetricName(String name, String group, Map<String, String> tags) {
-        this(name, group, "", tags);
-    }
-
-    /**
-     * @param name        The name of the metric
-     * @param group       logical group name of the metrics to which this metric belongs
-     * @param description A human-readable description to include in the metric
-     */
-    public MetricName(String name, String group, String description) {
-        this(name, group, description, new HashMap<String, String>());
-    }
-
-    /**
-     * @param name  The name of the metric
-     * @param group logical group name of the metrics to which this metric belongs
-     */
-    public MetricName(String name, String group) {
-        this(name, group, "", new HashMap<String, String>());
-    }
-
-    public String name() {
-        return this.name;
-    }
-
-    public String group() {
-        return this.group;
-    }
-
-    public Map<String, String> tags() {
-        return this.tags;
-    }
-
-    public String description() {
-        return this.description;
-    }
-
-    @Override
-    public int hashCode() {
-        if (hash != 0)
-            return hash;
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((group == null) ? 0 : group.hashCode());
-        result = prime * result + ((name == null) ? 0 : name.hashCode());
-        result = prime * result + ((tags == null) ? 0 : tags.hashCode());
-        this.hash = result;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        MetricName other = (MetricName) obj;
-        if (group == null) {
-            if (other.group != null)
-                return false;
-        } else if (!group.equals(other.group))
-            return false;
-        if (name == null) {
-            if (other.name != null)
-                return false;
-        } else if (!name.equals(other.name))
-            return false;
-        if (tags == null) {
-            if (other.tags != null)
-                return false;
-        } else if (!tags.equals(other.tags))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "MetricName [name=" + name + ", group=" + group + ", description="
-                + description + ", tags=" + tags + "]";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
deleted file mode 100644
index dd0537e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import java.io.Serializable;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Information about a Kafka node
- */
-public class Node implements Serializable {
-
-    private final int id;
-    private final String idString;
-    private final String host;
-    private final int port;
-
-    public Node(int id, String host, int port) {
-        super();
-        this.id = id;
-        this.idString = Integer.toString(id);
-        this.host = host;
-        this.port = port;
-    }
-
-    public static Node noNode() {
-        return new Node(-1, "", -1);
-    }
-
-    /**
-     * The node id of this node
-     */
-    public int id() {
-        return id;
-    }
-
-    /**
-     * String representation of the node id.
-     * Typically the integer id is used to serialize over the wire, the string representation is used as an identifier with NetworkClient code
-     */
-    public String idString() {
-        return idString;
-    }
-
-    /**
-     * The host name for this node
-     */
-    public String host() {
-        return host;
-    }
-
-    /**
-     * The port for this node
-     */
-    public int port() {
-        return port;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((host == null) ? 0 : host.hashCode());
-        result = prime * result + id;
-        result = prime * result + port;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        Node other = (Node) obj;
-        if (host == null) {
-            if (other.host != null)
-                return false;
-        } else if (!host.equals(other.host))
-            return false;
-        if (id != other.id)
-            return false;
-        if (port != other.port)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "Node(" + id + ", " + host + ", " + port + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
deleted file mode 100644
index ac7cc61..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Information about a topic-partition.
- */
-public class PartitionInfo {
-
-    private final String topic;
-    private final int partition;
-    private final Node leader;
-    private final Node[] replicas;
-    private final Node[] inSyncReplicas;
-
-    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
-        this.topic = topic;
-        this.partition = partition;
-        this.leader = leader;
-        this.replicas = replicas;
-        this.inSyncReplicas = inSyncReplicas;
-    }
-
-    /**
-     * The topic name
-     */
-    public String topic() {
-        return topic;
-    }
-
-    /**
-     * The partition id
-     */
-    public int partition() {
-        return partition;
-    }
-
-    /**
-     * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
-     */
-    public Node leader() {
-        return leader;
-    }
-
-    /**
-     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
-     */
-    public Node[] replicas() {
-        return replicas;
-    }
-
-    /**
-     * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
-     * the leader should fail
-     */
-    public Node[] inSyncReplicas() {
-        return inSyncReplicas;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
-                topic,
-                partition,
-                leader == null ? "none" : leader.id(),
-                fmtNodeIds(replicas),
-                fmtNodeIds(inSyncReplicas));
-    }
-
-    /* Extract the node ids from each item in the array and format for display */
-    private String fmtNodeIds(Node[] nodes) {
-        StringBuilder b = new StringBuilder("[");
-        for (int i = 0; i < nodes.length - 1; i++) {
-            b.append(Integer.toString(nodes[i].id()));
-            b.append(',');
-        }
-        if (nodes.length > 0) {
-            b.append(Integer.toString(nodes[nodes.length - 1].id()));
-            b.append(',');
-        }
-        b.append("]");
-        return b.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
deleted file mode 100644
index cfb4848..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common;
-
-import java.io.Serializable;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A topic name and partition number
- */
-public final class TopicPartition implements Serializable {
-
-    private int hash = 0;
-    private final int partition;
-    private final String topic;
-
-    public TopicPartition(String topic, int partition) {
-        this.partition = partition;
-        this.topic = topic;
-    }
-
-    public int partition() {
-        return partition;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    @Override
-    public int hashCode() {
-        if (hash != 0)
-            return hash;
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + partition;
-        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
-        this.hash = result;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        TopicPartition other = (TopicPartition) obj;
-        if (partition != other.partition)
-            return false;
-        if (topic == null) {
-            if (other.topic != null)
-                return false;
-        } else if (!topic.equals(other.topic))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return topic + "-" + partition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
deleted file mode 100644
index 1b5cbc9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.Configurable;
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A convenient base class for configurations to extend.
- * <p>
- * This class holds both the original configuration that was provided as well as the parsed
- */
-public class AbstractConfig {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    /* configs for which values have been requested, used to detect unused configs */
-    private final Set<String> used;
-
-    /* the original values passed in by the user */
-    private final Map<String, ?> originals;
-
-    /* the parsed values */
-    private final Map<String, Object> values;
-
-    @SuppressWarnings("unchecked")
-    public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
-        /* check that all the keys are really strings */
-        for (Object key : originals.keySet())
-            if (!(key instanceof String))
-                throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
-        this.originals = (Map<String, ?>) originals;
-        this.values = definition.parse(this.originals);
-        this.used = Collections.synchronizedSet(new HashSet<String>());
-        logAll();
-    }
-
-    protected Object get(String key) {
-        if (!values.containsKey(key))
-            throw new ConfigException(String.format("Unknown configuration '%s'", key));
-        used.add(key);
-        return values.get(key);
-    }
-
-    public Short getShort(String key) {
-        return (Short) get(key);
-    }
-
-    public Integer getInt(String key) {
-        return (Integer) get(key);
-    }
-
-    public Long getLong(String key) {
-        return (Long) get(key);
-    }
-
-    public Double getDouble(String key) {
-        return (Double) get(key);
-    }
-
-    @SuppressWarnings("unchecked")
-    public List<String> getList(String key) {
-        return (List<String>) get(key);
-    }
-
-    public boolean getBoolean(String key) {
-        return (Boolean) get(key);
-    }
-
-    public String getString(String key) {
-        return (String) get(key);
-    }
-
-    public Class<?> getClass(String key) {
-        return (Class<?>) get(key);
-    }
-
-    public Set<String> unused() {
-        Set<String> keys = new HashSet<String>(originals.keySet());
-        keys.removeAll(used);
-        return keys;
-    }
-
-    public Map<String, Object> originals() {
-        Map<String, Object> copy = new HashMap<String, Object>();
-        copy.putAll(originals);
-        return copy;
-    }
-
-    private void logAll() {
-        StringBuilder b = new StringBuilder();
-        b.append(getClass().getSimpleName());
-        b.append(" values: ");
-        b.append(Utils.NL);
-        for (Map.Entry<String, Object> entry : this.values.entrySet()) {
-            b.append('\t');
-            b.append(entry.getKey());
-            b.append(" = ");
-            b.append(entry.getValue());
-            b.append(Utils.NL);
-        }
-        log.info(b.toString());
-    }
-
-    /**
-     * Log warnings for any unused configurations
-     */
-    public void logUnused() {
-        for (String key : unused())
-            log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
-    }
-
-    /**
-     * Get a configured instance of the give class specified by the given configuration key. If the object implements
-     * Configurable configure it using the configuration.
-     * 
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
-     * @return A configured instance of the class
-     */
-    public <T> T getConfiguredInstance(String key, Class<T> t) {
-        Class<?> c = getClass(key);
-        if (c == null)
-            return null;
-        Object o = Utils.newInstance(c);
-        if (!t.isInstance(o))
-            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
-        if (o instanceof Configurable)
-            ((Configurable) o).configure(this.originals);
-        return t.cast(o);
-    }
-
-    public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
-        List<String> klasses = getList(key);
-        List<T> objects = new ArrayList<T>();
-        for (String klass : klasses) {
-            Class<?> c;
-            try {
-                c = Class.forName(klass);
-            } catch (ClassNotFoundException e) {
-                throw new ConfigException(key, klass, "Class " + klass + " could not be found.");
-            }
-            if (c == null)
-                return null;
-            Object o = Utils.newInstance(c);
-            if (!t.isInstance(o))
-                throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
-            if (o instanceof Configurable)
-                ((Configurable) o).configure(this.originals);
-            objects.add(t.cast(o));
-        }
-        return objects;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
deleted file mode 100644
index 1bbe891..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This class is used for specifying the set of expected configurations, their type, their defaults, their
- * documentation, and any special validation logic used for checking the correctness of the values the user provides.
- * <p/>
- * Usage of this class looks something like this:
- * <p/>
- * <pre>
- * ConfigDef defs = new ConfigDef();
- * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
- * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
- *
- * Properties props = new Properties();
- * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
- * Map&lt;String, Object&gt; configs = defs.parse(props);
- *
- * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
- * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
- * </pre>
- * <p/>
- * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
- * functionality for accessing configs.
- */
-public class ConfigDef {
-
-    private static final Object NO_DEFAULT_VALUE = new String("");
-
-    private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
-
-    /**
-     * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
-     *
-     * @return new unmodifiable {@link Set} instance containing the keys
-     */
-    public Set<String> names() {
-        return Collections.unmodifiableSet(configKeys.keySet());
-    }
-
-    /**
-     * Define a new configuration
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param defaultValue  The default value to use if this config isn't present
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @param required      Should the config fail if given property is not set and doesn't have default value specified
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
-                            boolean required) {
-        if (configKeys.containsKey(name))
-            throw new ConfigException("Configuration " + name + " is defined twice.");
-        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
-        return this;
-    }
-
-    /**
-     * Define a new required configuration
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param defaultValue  The default value to use if this config isn't present
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
-        return define(name, type, defaultValue, validator, importance, documentation, true);
-    }
-
-    /**
-     * Define a new configuration with no special validation logic
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param defaultValue  The default value to use if this config isn't present
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
-        return define(name, type, defaultValue, null, importance, documentation, true);
-    }
-
-    /**
-     * Define a required parameter with no default value
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
-    }
-
-    /**
-     * Define a required parameter with no default value and no special validation logic
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Importance importance, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
-    }
-
-    /**
-     * Define a required parameter with no default value and no special validation logic
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
-     * @param required      Should the config fail if given property is not set and doesn't have default value specified
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
-    }
-
-
-    /**
-     * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
-     * that the keys of the map are strings, but the values can either be strings or they may already be of the
-     * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
-     * programmatically constructed map.
-     *
-     * @param props The configs to parse and validate
-     * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
-     * the appropriate type (int, string, etc)
-     */
-    public Map<String, Object> parse(Map<?, ?> props) {
-        /* parse all known keys */
-        Map<String, Object> values = new HashMap<String, Object>();
-        for (ConfigKey key : configKeys.values()) {
-            Object value;
-            // props map contains setting - assign ConfigKey value
-            if (props.containsKey(key.name))
-                value = parseType(key.name, props.get(key.name), key.type);
-                // props map doesn't contain setting, the key is required and no default value specified - it's an error
-            else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
-                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
-                // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
-            else if (!key.hasDefault() && !key.required)
-                value = null;
-                // otherwise assign setting it's default value
-            else
-                value = key.defaultValue;
-            if (key.validator != null)
-                key.validator.ensureValid(key.name, value);
-            values.put(key.name, value);
-        }
-        return values;
-    }
-
-    /**
-     * Parse a value according to its expected type.
-     *
-     * @param name  The config name
-     * @param value The config value
-     * @param type  The expected type
-     * @return The parsed object
-     */
-    private Object parseType(String name, Object value, Type type) {
-        try {
-            String trimmed = null;
-            if (value instanceof String)
-                trimmed = ((String) value).trim();
-            switch (type) {
-                case BOOLEAN:
-                    if (value instanceof String) {
-                        if (trimmed.equalsIgnoreCase("true"))
-                            return true;
-                        else if (trimmed.equalsIgnoreCase("false"))
-                            return false;
-                        else
-                            throw new ConfigException(name, value, "Expected value to be either true or false");
-                    } else if (value instanceof Boolean)
-                        return value;
-                    else
-                        throw new ConfigException(name, value, "Expected value to be either true or false");
-                case STRING:
-                    if (value instanceof String)
-                        return trimmed;
-                    else
-                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
-                case INT:
-                    if (value instanceof Integer) {
-                        return (Integer) value;
-                    } else if (value instanceof String) {
-                        return Integer.parseInt(trimmed);
-                    } else {
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                    }
-                case SHORT:
-                    if (value instanceof Short) {
-                        return (Short) value;
-                    } else if (value instanceof String) {
-                        return Short.parseShort(trimmed);
-                    } else {
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                    }
-                case LONG:
-                    if (value instanceof Integer)
-                        return ((Integer) value).longValue();
-                    if (value instanceof Long)
-                        return (Long) value;
-                    else if (value instanceof String)
-                        return Long.parseLong(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                case DOUBLE:
-                    if (value instanceof Number)
-                        return ((Number) value).doubleValue();
-                    else if (value instanceof String)
-                        return Double.parseDouble(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                case LIST:
-                    if (value instanceof List)
-                        return (List<?>) value;
-                    else if (value instanceof String)
-                        if (trimmed.isEmpty())
-                            return Collections.emptyList();
-                        else
-                            return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
-                    else
-                        throw new ConfigException(name, value, "Expected a comma separated list.");
-                case CLASS:
-                    if (value instanceof Class)
-                        return (Class<?>) value;
-                    else if (value instanceof String)
-                        return Class.forName(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected a Class instance or class name.");
-                default:
-                    throw new IllegalStateException("Unknown type.");
-            }
-        } catch (NumberFormatException e) {
-            throw new ConfigException(name, value, "Not a number of type " + type);
-        } catch (ClassNotFoundException e) {
-            throw new ConfigException(name, value, "Class " + value + " could not be found.");
-        }
-    }
-
-    /**
-     * The config types
-     */
-    public enum Type {
-        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
-    }
-
-    public enum Importance {
-        HIGH, MEDIUM, LOW
-    }
-
-    /**
-     * Validation logic the user may provide
-     */
-    public interface Validator {
-        public void ensureValid(String name, Object o);
-    }
-
-    /**
-     * Validation logic for numeric ranges
-     */
-    public static class Range implements Validator {
-        private final Number min;
-        private final Number max;
-
-        private Range(Number min, Number max) {
-            this.min = min;
-            this.max = max;
-        }
-
-        /**
-         * A numeric range that checks only the lower bound
-         *
-         * @param min The minimum acceptable value
-         */
-        public static Range atLeast(Number min) {
-            return new Range(min, null);
-        }
-
-        /**
-         * A numeric range that checks both the upper and lower bound
-         */
-        public static Range between(Number min, Number max) {
-            return new Range(min, max);
-        }
-
-        public void ensureValid(String name, Object o) {
-            Number n = (Number) o;
-            if (min != null && n.doubleValue() < min.doubleValue())
-                throw new ConfigException(name, o, "Value must be at least " + min);
-            if (max != null && n.doubleValue() > max.doubleValue())
-                throw new ConfigException(name, o, "Value must be no more than " + max);
-        }
-
-        public String toString() {
-            if (min == null)
-                return "[...," + max + "]";
-            else if (max == null)
-                return "[" + min + ",...]";
-            else
-                return "[" + min + ",...," + max + "]";
-        }
-    }
-
-    public static class ValidString implements Validator {
-        List<String> validStrings;
-
-        private ValidString(List<String> validStrings) {
-            this.validStrings = validStrings;
-        }
-
-        public static ValidString in(String... validStrings) {
-            return new ValidString(Arrays.asList(validStrings));
-        }
-
-        @Override
-        public void ensureValid(String name, Object o) {
-            String s = (String) o;
-            if (!validStrings.contains(s)) {
-                throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
-            }
-
-        }
-
-        public String toString() {
-            return "[" + Utils.join(validStrings, ", ") + "]";
-        }
-    }
-
-    private static class ConfigKey {
-        public final String name;
-        public final Type type;
-        public final String documentation;
-        public final Object defaultValue;
-        public final Validator validator;
-        public final Importance importance;
-        public final boolean required;
-
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
-            super();
-            this.name = name;
-            this.type = type;
-            this.defaultValue = defaultValue;
-            this.validator = validator;
-            this.importance = importance;
-            if (this.validator != null)
-                this.validator.ensureValid(name, defaultValue);
-            this.documentation = documentation;
-            this.required = required;
-        }
-
-        public boolean hasDefault() {
-            return this.defaultValue != NO_DEFAULT_VALUE;
-        }
-
-    }
-
-    public String toHtmlTable() {
-        // sort first required fields, then by importance, then name
-        List<ConfigKey> configs = new ArrayList<ConfigKey>(this.configKeys.values());
-        Collections.sort(configs, new Comparator<ConfigKey>() {
-            public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
-                // first take anything with no default value
-                if (!k1.hasDefault() && k2.hasDefault())
-                    return -1;
-                else if (!k2.hasDefault() && k1.hasDefault())
-                    return 1;
-
-                // then sort by importance
-                int cmp = k1.importance.compareTo(k2.importance);
-                if (cmp == 0)
-                    // then sort in alphabetical order
-                    return k1.name.compareTo(k2.name);
-                else
-                    return cmp;
-            }
-        });
-        StringBuilder b = new StringBuilder();
-        b.append("<table>\n");
-        b.append("<tr>\n");
-        b.append("<th>Name</th>\n");
-        b.append("<th>Type</th>\n");
-        b.append("<th>Default</th>\n");
-        b.append("<th>Importance</th>\n");
-        b.append("<th>Description</th>\n");
-        b.append("</tr>\n");
-        for (ConfigKey def : configs) {
-            b.append("<tr>\n");
-            b.append("<td>");
-            b.append(def.name);
-            b.append("</td>");
-            b.append("<td>");
-            b.append(def.type.toString().toLowerCase());
-            b.append("</td>");
-            b.append("<td>");
-            b.append(def.defaultValue == null ? "" : def.defaultValue);
-            b.append("</td>");
-            b.append("<td>");
-            b.append(def.importance.toString().toLowerCase());
-            b.append("</td>");
-            b.append("<td>");
-            b.append(def.documentation);
-            b.append("</td>");
-            b.append("</tr>\n");
-        }
-        b.append("</table>");
-        return b.toString();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
deleted file mode 100644
index 13b9410..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown if the user supplies an invalid configuration
- */
-public class ConfigException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ConfigException(String message) {
-        super(message);
-    }
-
-    public ConfigException(String name, Object value) {
-        this(name, value, null);
-    }
-
-    public ConfigException(String name, Object value, String message) {
-        super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
deleted file mode 100644
index 1e6f7ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
- * package.
- */
-public class ApiException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ApiException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ApiException(String message) {
-        super(message);
-    }
-
-    public ApiException(Throwable cause) {
-        super(cause);
-    }
-
-    public ApiException() {
-        super();
-    }
-
-    /* avoid the expensive and useless stack trace for api exceptions */
-    @Override
-    public Throwable fillInStackTrace() {
-        return this;
-    }
-
-}