You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:04 UTC

[47/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
deleted file mode 100644
index 1746c22..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
+++ /dev/null
@@ -1,791 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.clients.ClientResponse;
-import org.apache.flink.kafka_backport.clients.consumer.CommitType;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerCommitCallback;
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.Node;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.errors.DisconnectException;
-import org.apache.flink.kafka_backport.common.metrics.Measurable;
-import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
-import org.apache.flink.kafka_backport.common.metrics.Metrics;
-import org.apache.flink.kafka_backport.common.metrics.Sensor;
-import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
-import org.apache.flink.kafka_backport.common.metrics.stats.Count;
-import org.apache.flink.kafka_backport.common.metrics.stats.Max;
-import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.requests.ConsumerMetadataRequest;
-import org.apache.flink.kafka_backport.common.requests.ConsumerMetadataResponse;
-import org.apache.flink.kafka_backport.common.requests.HeartbeatRequest;
-import org.apache.flink.kafka_backport.common.requests.HeartbeatResponse;
-import org.apache.flink.kafka_backport.common.requests.JoinGroupRequest;
-import org.apache.flink.kafka_backport.common.requests.JoinGroupResponse;
-import org.apache.flink.kafka_backport.common.requests.OffsetCommitRequest;
-import org.apache.flink.kafka_backport.common.requests.OffsetCommitResponse;
-import org.apache.flink.kafka_backport.common.requests.OffsetFetchRequest;
-import org.apache.flink.kafka_backport.common.requests.OffsetFetchResponse;
-import org.apache.flink.kafka_backport.common.utils.Time;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This class manages the coordination process with the consumer coordinator.
- */
-public final class Coordinator {
-
-    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
-
-    private final ConsumerNetworkClient client;
-    private final Time time;
-    private final String groupId;
-    private final Heartbeat heartbeat;
-    private final HeartbeatTask heartbeatTask;
-    private final int sessionTimeoutMs;
-    private final String assignmentStrategy;
-    private final SubscriptionState subscriptions;
-    private final CoordinatorMetrics sensors;
-    private final long requestTimeoutMs;
-    private final long retryBackoffMs;
-    private final RebalanceCallback rebalanceCallback;
-    private Node consumerCoordinator;
-    private String consumerId;
-    private int generation;
-
-
-    /**
-     * Initialize the coordination manager.
-     */
-    public Coordinator(ConsumerNetworkClient client,
-                       String groupId,
-                       int sessionTimeoutMs,
-                       String assignmentStrategy,
-                       SubscriptionState subscriptions,
-                       Metrics metrics,
-                       String metricGrpPrefix,
-                       Map<String, String> metricTags,
-                       Time time,
-                       long requestTimeoutMs,
-                       long retryBackoffMs,
-                       RebalanceCallback rebalanceCallback) {
-
-        this.client = client;
-        this.time = time;
-        this.generation = -1;
-        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-        this.groupId = groupId;
-        this.consumerCoordinator = null;
-        this.subscriptions = subscriptions;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.assignmentStrategy = assignmentStrategy;
-        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
-        this.heartbeatTask = new HeartbeatTask();
-        this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.retryBackoffMs = retryBackoffMs;
-        this.rebalanceCallback = rebalanceCallback;
-    }
-
-    /**
-     * Refresh the committed offsets for provided partitions.
-     */
-    public void refreshCommittedOffsetsIfNeeded() {
-        if (subscriptions.refreshCommitsNeeded()) {
-            Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
-            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
-                TopicPartition tp = entry.getKey();
-                this.subscriptions.committed(tp, entry.getValue());
-            }
-            this.subscriptions.commitsRefreshed();
-        }
-    }
-
-    /**
-     * Fetch the current committed offsets from the coordinator for a set of partitions.
-     * @param partitions The partitions to fetch offsets for
-     * @return A map from partition to the committed offset
-     */
-    public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
-        while (true) {
-            ensureCoordinatorKnown();
-            ensurePartitionAssignment();
-
-            // contact coordinator to fetch committed offsets
-            RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
-            client.poll(future);
-
-            if (future.succeeded())
-                return future.value();
-
-            if (!future.isRetriable())
-                throw future.exception();
-
-            Utils.sleep(retryBackoffMs);
-        }
-    }
-
-    /**
-     * Ensure that we have a valid partition assignment from the coordinator.
-     */
-    public void ensurePartitionAssignment() {
-        if (!subscriptions.partitionAssignmentNeeded())
-            return;
-
-        // execute the user's callback before rebalance
-        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
-            rebalanceCallback.onPartitionsRevoked(revoked);
-        } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                    + " failed on partition revocation: ", e);
-        }
-
-        reassignPartitions();
-
-        // execute the user's callback after rebalance
-        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
-            rebalanceCallback.onPartitionsAssigned(assigned);
-        } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
-                    + " failed on partition assignment: ", e);
-        }
-    }
-
-    private void reassignPartitions() {
-        while (subscriptions.partitionAssignmentNeeded()) {
-            ensureCoordinatorKnown();
-
-            // ensure that there are no pending requests to the coordinator. This is important
-            // in particular to avoid resending a pending JoinGroup request.
-            if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
-                client.awaitPendingRequests(this.consumerCoordinator);
-                continue;
-            }
-
-            RequestFuture<Void> future = sendJoinGroupRequest();
-            client.poll(future);
-
-            if (future.failed()) {
-                if (!future.isRetriable())
-                    throw future.exception();
-                Utils.sleep(retryBackoffMs);
-            }
-        }
-    }
-
-    /**
-     * Block until the coordinator for this group is known.
-     */
-    public void ensureCoordinatorKnown() {
-        while (coordinatorUnknown()) {
-            RequestFuture<Void> future = sendConsumerMetadataRequest();
-            client.poll(future, requestTimeoutMs);
-
-            if (future.failed())
-                client.awaitMetadataUpdate();
-        }
-    }
-
-    /**
-     * Commit offsets. This call blocks (regardless of commitType) until the coordinator
-     * can receive the commit request. Once the request has been made, however, only the
-     * synchronous commits will wait for a successful response from the coordinator.
-     * @param offsets Offsets to commit.
-     * @param commitType Commit policy
-     * @param callback Callback to be executed when the commit request finishes
-     */
-    public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
-        if (commitType == CommitType.ASYNC)
-            commitOffsetsAsync(offsets, callback);
-        else
-            commitOffsetsSync(offsets, callback);
-    }
-
-    private class HeartbeatTask implements DelayedTask {
-
-        public void reset() {
-            // start or restart the heartbeat task to be executed at the next chance
-            long now = time.milliseconds();
-            heartbeat.resetSessionTimeout(now);
-            client.unschedule(this);
-            client.schedule(this, now);
-        }
-
-        @Override
-        public void run(final long now) {
-            if (!subscriptions.partitionsAutoAssigned() ||
-                    subscriptions.partitionAssignmentNeeded() ||
-                    coordinatorUnknown())
-                // no need to send if we're not using auto-assignment or if we are
-                // awaiting a rebalance
-                return;
-
-            if (heartbeat.sessionTimeoutExpired(now)) {
-                // we haven't received a successful heartbeat in one session interval
-                // so mark the coordinator dead
-                coordinatorDead();
-                return;
-            }
-
-            if (!heartbeat.shouldHeartbeat(now)) {
-                // we don't need to heartbeat now, so reschedule for when we do
-                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
-            } else {
-                heartbeat.sentHeartbeat(now);
-                RequestFuture<Void> future = sendHeartbeatRequest();
-                future.addListener(new RequestFutureListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        long now = time.milliseconds();
-                        heartbeat.receiveHeartbeat(now);
-                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
-                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
-                    }
-
-                    @Override
-                    public void onFailure(RuntimeException e) {
-                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
-                    }
-                });
-            }
-        }
-    }
-
-    /**
-     * Send a request to get a new partition assignment. This is a non-blocking call which sends
-     * a JoinGroup request to the coordinator (if it is available). The returned future must
-     * be polled to see if the request completed successfully.
-     * @return A request future whose completion indicates the result of the JoinGroup request.
-     */
-    private RequestFuture<Void> sendJoinGroupRequest() {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        // send a join group request to the coordinator
-        List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
-        log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
-
-        JoinGroupRequest request = new JoinGroupRequest(groupId,
-                this.sessionTimeoutMs,
-                subscribedTopics,
-                this.consumerId,
-                this.assignmentStrategy);
-
-        // create the request for the coordinator
-        log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
-        return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
-                .compose(new JoinGroupResponseHandler());
-    }
-
-    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
-
-        @Override
-        public JoinGroupResponse parse(ClientResponse response) {
-            return new JoinGroupResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
-            // process the response
-            short errorCode = joinResponse.errorCode();
-
-            if (errorCode == Errors.NONE.code()) {
-                Coordinator.this.consumerId = joinResponse.consumerId();
-                Coordinator.this.generation = joinResponse.generationId();
-
-                // set the flag to refresh last committed offsets
-                subscriptions.needRefreshCommits();
-
-                log.debug("Joined group: {}", joinResponse.toStruct());
-
-                // record re-assignment time
-                sensors.partitionReassignments.record(response.requestLatencyMs());
-
-                // update partition assignment
-                subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
-                heartbeatTask.reset();
-                future.complete(null);
-            } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
-                // reset the consumer id and retry immediately
-                Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-                log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
-                        groupId);
-                future.raise(Errors.UNKNOWN_CONSUMER_ID);
-            } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                    || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                // re-discover the coordinator and retry with backoff
-                coordinatorDead();
-                log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
-                        groupId);
-                future.raise(Errors.forCode(errorCode));
-            } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
-                    || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
-                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
-                // log the error and re-throw the exception
-                Errors error = Errors.forCode(errorCode);
-                log.error("Attempt to join group {} failed due to: {}",
-                        groupId, error.exception().getMessage());
-                future.raise(error);
-            } else {
-                // unexpected error, throw the exception
-                future.raise(new KafkaException("Unexpected error in join group response: "
-                        + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
-            }
-        }
-    }
-
-    private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
-        this.subscriptions.needRefreshCommits();
-        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-        if (callback != null) {
-            future.addListener(new RequestFutureListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    callback.onComplete(offsets, null);
-                }
-
-                @Override
-                public void onFailure(RuntimeException e) {
-                    callback.onComplete(offsets, e);
-                }
-            });
-        }
-    }
-
-    private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
-        while (true) {
-            ensureCoordinatorKnown();
-            ensurePartitionAssignment();
-
-            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-            client.poll(future);
-
-            if (future.succeeded()) {
-                if (callback != null)
-                    callback.onComplete(offsets, null);
-                return;
-            }
-
-            if (!future.isRetriable()) {
-                if (callback == null)
-                    throw future.exception();
-                else
-                    callback.onComplete(offsets, future.exception());
-                return;
-            }
-
-            Utils.sleep(retryBackoffMs);
-        }
-    }
-
-    /**
-     * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
-     * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
-     * asynchronous case.
-     *
-     * @param offsets The list of offsets per partition that should be committed.
-     * @return A request future whose value indicates whether the commit was successful or not
-     */
-    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        if (offsets.isEmpty())
-            return RequestFuture.voidSuccess();
-
-        // create the offset commit request
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
-        offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
-        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
-        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
-                this.generation,
-                this.consumerId,
-                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
-                offsetData);
-
-        return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
-                .compose(new OffsetCommitResponseHandler(offsets));
-    }
-
-
-    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
-
-        private final Map<TopicPartition, Long> offsets;
-
-        public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
-            this.offsets = offsets;
-        }
-
-        @Override
-        public OffsetCommitResponse parse(ClientResponse response) {
-            return new OffsetCommitResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
-            sensors.commitLatency.record(response.requestLatencyMs());
-            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                long offset = this.offsets.get(tp);
-                short errorCode = entry.getValue();
-                if (errorCode == Errors.NONE.code()) {
-                    log.debug("Committed offset {} for partition {}", offset, tp);
-                    subscriptions.committed(tp, offset);
-                } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                        || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                    coordinatorDead();
-                    future.raise(Errors.forCode(errorCode));
-                    return;
-                } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
-                        || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
-                    // do not need to throw the exception but just log the error
-                    log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
-                } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                        || errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                    // need to re-join group
-                    subscriptions.needReassignment();
-                    future.raise(Errors.forCode(errorCode));
-                    return;
-                } else {
-                    // do not need to throw the exception but just log the error
-                    future.raise(Errors.forCode(errorCode));
-                    log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
-                }
-            }
-
-            future.complete(null);
-        }
-    }
-
-    /**
-     * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
-     * returned future can be polled to get the actual offsets returned from the broker.
-     *
-     * @param partitions The set of partitions to get offsets for.
-     * @return A request future containing the committed offsets.
-     */
-    private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        log.debug("Fetching committed offsets for partitions: {}",  Utils.join(partitions, ", "));
-        // construct the request
-        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
-
-        // send the request with a callback
-        return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
-                .compose(new OffsetFetchResponseHandler());
-    }
-
-    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
-
-        @Override
-        public OffsetFetchResponse parse(ClientResponse response) {
-            return new OffsetFetchResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
-            Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
-            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                OffsetFetchResponse.PartitionData data = entry.getValue();
-                if (data.hasError()) {
-                    log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
-                            .exception()
-                            .getMessage());
-                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
-                        // just retry
-                        future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
-                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                        // re-discover the coordinator and retry
-                        coordinatorDead();
-                        future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
-                    } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                            || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                        future.raise(Errors.forCode(data.errorCode));
-                    } else {
-                        future.raise(new KafkaException("Unexpected error in fetch offset response: "
-                                + Errors.forCode(data.errorCode).exception().getMessage()));
-                    }
-                    return;
-                } else if (data.offset >= 0) {
-                    // record the position with the offset (-1 indicates no committed offset to fetch)
-                    offsets.put(tp, data.offset);
-                } else {
-                    log.debug("No committed offset for partition " + tp);
-                }
-            }
-
-            future.complete(offsets);
-        }
-    }
-
-    /**
-     * Send a heartbeat request now (visible only for testing).
-     */
-    public RequestFuture<Void> sendHeartbeatRequest() {
-        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
-        return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
-                .compose(new HeartbeatCompletionHandler());
-    }
-
-    public boolean coordinatorUnknown() {
-        return this.consumerCoordinator == null;
-    }
-
-    /**
-     * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
-     * one of the brokers. The returned future should be polled to get the result of the request.
-     * @return A request future which indicates the completion of the metadata request
-     */
-    private RequestFuture<Void> sendConsumerMetadataRequest() {
-        // initiate the consumer metadata request
-        // find a node to ask about the coordinator
-        Node node = this.client.leastLoadedNode();
-        if (node == null) {
-            // TODO: If there are no brokers left, perhaps we should use the bootstrap set
-            // from configuration?
-            return RequestFuture.noBrokersAvailable();
-        } else {
-            // create a consumer metadata request
-            log.debug("Issuing consumer metadata request to broker {}", node.id());
-            ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
-            return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
-                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
-                        @Override
-                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
-                            handleConsumerMetadataResponse(response, future);
-                        }
-                    });
-        }
-    }
-
-    private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
-        log.debug("Consumer metadata response {}", resp);
-
-        // parse the response to get the coordinator info if it is not disconnected,
-        // otherwise we need to request metadata update
-        if (resp.wasDisconnected()) {
-            future.raise(new DisconnectException());
-        } else if (!coordinatorUnknown()) {
-            // We already found the coordinator, so ignore the request
-            future.complete(null);
-        } else {
-            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
-            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
-            // for the coordinator in the underlying network client layer
-            // TODO: this needs to be better handled in KAFKA-1935
-            if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
-                this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
-                        consumerMetadataResponse.node().host(),
-                        consumerMetadataResponse.node().port());
-                heartbeatTask.reset();
-                future.complete(null);
-            } else {
-                future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
-            }
-        }
-    }
-
-    /**
-     * Mark the current coordinator as dead.
-     */
-    private void coordinatorDead() {
-        if (this.consumerCoordinator != null) {
-            log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
-            this.consumerCoordinator = null;
-        }
-    }
-
-    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
-        @Override
-        public HeartbeatResponse parse(ClientResponse response) {
-            return new HeartbeatResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
-            sensors.heartbeatLatency.record(response.requestLatencyMs());
-            short error = heartbeatResponse.errorCode();
-            if (error == Errors.NONE.code()) {
-                log.debug("Received successful heartbeat response.");
-                future.complete(null);
-            } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                    || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
-                coordinatorDead();
-                future.raise(Errors.forCode(error));
-            } else if (error == Errors.ILLEGAL_GENERATION.code()) {
-                log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
-                subscriptions.needReassignment();
-                future.raise(Errors.ILLEGAL_GENERATION);
-            } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
-                log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
-                consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-                subscriptions.needReassignment();
-                future.raise(Errors.UNKNOWN_CONSUMER_ID);
-            } else {
-                future.raise(new KafkaException("Unexpected error in heartbeat response: "
-                        + Errors.forCode(error).exception().getMessage()));
-            }
-        }
-    }
-
-    private abstract class CoordinatorResponseHandler<R, T>
-            extends RequestFutureAdapter<ClientResponse, T> {
-        protected ClientResponse response;
-
-        public abstract R parse(ClientResponse response);
-
-        public abstract void handle(R response, RequestFuture<T> future);
-
-        @Override
-        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
-            this.response = clientResponse;
-
-            if (clientResponse.wasDisconnected()) {
-                int correlation = response.request().request().header().correlationId();
-                log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
-                        response.request(),
-                        correlation,
-                        response.request().request().destination());
-
-                // mark the coordinator as dead
-                coordinatorDead();
-                future.raise(new DisconnectException());
-                return;
-            }
-
-            R response = parse(clientResponse);
-            handle(response, future);
-        }
-
-        @Override
-        public void onFailure(RuntimeException e, RequestFuture<T> future) {
-            if (e instanceof DisconnectException) {
-                log.debug("Coordinator request failed", e);
-                coordinatorDead();
-            }
-            future.raise(e);
-        }
-    }
-
-    public interface RebalanceCallback {
-        void onPartitionsAssigned(Collection<TopicPartition> partitions);
-        void onPartitionsRevoked(Collection<TopicPartition> partitions);
-    }
-
-    private class CoordinatorMetrics {
-        public final Metrics metrics;
-        public final String metricGrpName;
-
-        public final Sensor commitLatency;
-        public final Sensor heartbeatLatency;
-        public final Sensor partitionReassignments;
-
-        public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
-            this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
-
-            this.commitLatency = metrics.sensor("commit-latency");
-            this.commitLatency.add(new MetricName("commit-latency-avg",
-                this.metricGrpName,
-                "The average time taken for a commit request",
-                tags), new Avg());
-            this.commitLatency.add(new MetricName("commit-latency-max",
-                this.metricGrpName,
-                "The max time taken for a commit request",
-                tags), new Max());
-            this.commitLatency.add(new MetricName("commit-rate",
-                this.metricGrpName,
-                "The number of commit calls per second",
-                tags), new Rate(new Count()));
-
-            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
-            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
-                this.metricGrpName,
-                "The max time taken to receive a response to a hearbeat request",
-                tags), new Max());
-            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
-                this.metricGrpName,
-                "The average number of heartbeats per second",
-                tags), new Rate(new Count()));
-
-            this.partitionReassignments = metrics.sensor("reassignment-latency");
-            this.partitionReassignments.add(new MetricName("reassignment-time-avg",
-                this.metricGrpName,
-                "The average time taken for a partition reassignment",
-                tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-time-max",
-                this.metricGrpName,
-                "The max time taken for a partition reassignment",
-                tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-rate",
-                this.metricGrpName,
-                "The number of partition reassignments per second",
-                tags), new Rate(new Count()));
-
-            Measurable numParts =
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return subscriptions.assignedPartitions().size();
-                    }
-                };
-            metrics.addMetric(new MetricName("assigned-partitions",
-                this.metricGrpName,
-                "The number of partitions currently assigned to this consumer",
-                tags),
-                numParts);
-
-            Measurable lastHeartbeat =
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
-                    }
-                };
-            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
-                this.metricGrpName,
-                "The number of seconds since the last controller heartbeat",
-                tags),
-                lastHeartbeat);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
deleted file mode 100644
index 7792dff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-
-public interface DelayedTask {
-
-    /**
-     * Execute the task.
-     * @param now current time in milliseconds
-     */
-    void run(long now);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
deleted file mode 100644
index fbd1e19..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-/**
- * Tracks a set of tasks to be executed after a delay.
- */
-public class DelayedTaskQueue {
-
-    private PriorityQueue<Entry> tasks;
-
-    public DelayedTaskQueue() {
-        tasks = new PriorityQueue<Entry>();
-    }
-
-    /**
-     * Schedule a task for execution in the future.
-     *
-     * @param task the task to execute
-     * @param at the time at which to
-     */
-    public void add(DelayedTask task, long at) {
-        tasks.add(new Entry(task, at));
-    }
-
-    /**
-     * Remove a task from the queue if it is present
-     * @param task the task to be removed
-     * @returns true if a task was removed as a result of this call
-     */
-    public boolean remove(DelayedTask task) {
-        boolean wasRemoved = false;
-        Iterator<Entry> iterator = tasks.iterator();
-        while (iterator.hasNext()) {
-            Entry entry = iterator.next();
-            if (entry.task.equals(task)) {
-                iterator.remove();
-                wasRemoved = true;
-            }
-        }
-        return wasRemoved;
-    }
-
-    /**
-     * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
-     *
-     * @return the remaining time in milliseconds
-     */
-    public long nextTimeout(long now) {
-        if (tasks.isEmpty())
-            return Long.MAX_VALUE;
-        else
-            return Math.max(tasks.peek().timeout - now, 0);
-    }
-
-    /**
-     * Run any ready tasks.
-     *
-     * @param now the current time
-     */
-    public void poll(long now) {
-        while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
-            Entry entry = tasks.poll();
-            entry.task.run(now);
-        }
-    }
-
-    private static class Entry implements Comparable<Entry> {
-        DelayedTask task;
-        long timeout;
-
-        public Entry(DelayedTask task, long timeout) {
-            this.task = task;
-            this.timeout = timeout;
-        }
-
-        @Override
-        public int compareTo(Entry entry) {
-            return Long.compare(timeout, entry.timeout);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
deleted file mode 100644
index 6962a54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.clients.ClientResponse;
-import org.apache.flink.kafka_backport.clients.Metadata;
-import org.apache.flink.kafka_backport.clients.consumer.NoOffsetForPartitionException;
-import org.apache.flink.kafka_backport.clients.consumer.OffsetResetStrategy;
-import org.apache.flink.kafka_backport.common.Node;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
-import org.apache.flink.kafka_backport.common.metrics.Metrics;
-import org.apache.flink.kafka_backport.common.metrics.Sensor;
-import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
-import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
-import org.apache.flink.kafka_backport.common.record.LogEntry;
-import org.apache.flink.kafka_backport.common.record.MemoryRecords;
-import org.apache.flink.kafka_backport.common.requests.FetchRequest;
-import org.apache.flink.kafka_backport.common.requests.FetchResponse;
-import org.apache.flink.kafka_backport.common.serialization.Deserializer;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecord;
-import org.apache.flink.kafka_backport.common.Cluster;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.errors.DisconnectException;
-import org.apache.flink.kafka_backport.common.metrics.stats.Count;
-import org.apache.flink.kafka_backport.common.metrics.stats.Max;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.requests.ListOffsetRequest;
-import org.apache.flink.kafka_backport.common.requests.ListOffsetResponse;
-import org.apache.flink.kafka_backport.common.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This class manage the fetching process with the brokers.
- */
-public class Fetcher<K, V> {
-    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-
-    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
-
-    private final ConsumerNetworkClient client;
-    private final Time time;
-    private final int minBytes;
-    private final int maxWaitMs;
-    private final int fetchSize;
-    private final long retryBackoffMs;
-    private final boolean checkCrcs;
-    private final Metadata metadata;
-    private final FetchManagerMetrics sensors;
-    private final SubscriptionState subscriptions;
-    private final List<PartitionRecords<K, V>> records;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-
-    public Fetcher(ConsumerNetworkClient client,
-                   int minBytes,
-                   int maxWaitMs,
-                   int fetchSize,
-                   boolean checkCrcs,
-                   Deserializer<K> keyDeserializer,
-                   Deserializer<V> valueDeserializer,
-                   Metadata metadata,
-                   SubscriptionState subscriptions,
-                   Metrics metrics,
-                   String metricGrpPrefix,
-                   Map<String, String> metricTags,
-                   Time time,
-                   long retryBackoffMs) {
-
-        this.time = time;
-        this.client = client;
-        this.metadata = metadata;
-        this.subscriptions = subscriptions;
-        this.minBytes = minBytes;
-        this.maxWaitMs = maxWaitMs;
-        this.fetchSize = fetchSize;
-        this.checkCrcs = checkCrcs;
-
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
-
-        this.records = new LinkedList<PartitionRecords<K, V>>();
-
-        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
-        this.retryBackoffMs = retryBackoffMs;
-    }
-
-    /**
-     * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
-     *
-     * @param cluster The current cluster metadata
-     */
-    public void initFetches(Cluster cluster) {
-        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
-            final FetchRequest fetch = fetchEntry.getValue();
-            client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
-                    .addListener(new RequestFutureListener<ClientResponse>() {
-                        @Override
-                        public void onSuccess(ClientResponse response) {
-                            handleFetchResponse(response, fetch);
-                        }
-
-                        @Override
-                        public void onFailure(RuntimeException e) {
-                            log.debug("Fetch failed", e);
-                        }
-                    });
-        }
-    }
-
-    /**
-     * Update the fetch positions for the provided partitions.
-     * @param partitions
-     */
-    public void updateFetchPositions(Set<TopicPartition> partitions) {
-        // reset the fetch position to the committed position
-        for (TopicPartition tp : partitions) {
-            // skip if we already have a fetch position
-            if (subscriptions.fetched(tp) != null)
-                continue;
-
-            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
-            if (subscriptions.isOffsetResetNeeded(tp)) {
-                resetOffset(tp);
-            } else if (subscriptions.committed(tp) == null) {
-                // there's no committed position, so we need to reset with the default strategy
-                subscriptions.needOffsetReset(tp);
-                resetOffset(tp);
-            } else {
-                log.debug("Resetting offset for partition {} to the committed offset {}",
-                        tp, subscriptions.committed(tp));
-                subscriptions.seek(tp, subscriptions.committed(tp));
-            }
-        }
-    }
-
-    /**
-     * Reset offsets for the given partition using the offset reset strategy.
-     *
-     * @param partition The given partition that needs reset offset
-     * @throws org.apache.flink.kafka_backport.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
-     */
-    private void resetOffset(TopicPartition partition) {
-        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
-        final long timestamp;
-        if (strategy == OffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
-        else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
-        log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
-        long offset = listOffset(partition, timestamp);
-        this.subscriptions.seek(partition, offset);
-    }
-
-    /**
-     * Fetch a single offset before the given timestamp for the partition.
-     *
-     * @param partition The partition that needs fetching offset.
-     * @param timestamp The timestamp for fetching offset.
-     * @return The offset of the message that is published before the given timestamp
-     */
-    private long listOffset(TopicPartition partition, long timestamp) {
-        while (true) {
-            RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
-            client.poll(future);
-
-            if (future.succeeded())
-                return future.value();
-
-            if (!future.isRetriable())
-                throw future.exception();
-
-            if (future.exception() instanceof InvalidMetadataException)
-                client.awaitMetadataUpdate();
-            else
-                Utils.sleep(retryBackoffMs);
-        }
-    }
-
-    /**
-     * Return the fetched records, empty the record buffer and update the consumed position.
-     *
-     * @return The fetched records per partition
-     */
-    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
-        if (this.subscriptions.partitionAssignmentNeeded()) {
-            return Collections.emptyMap();
-        } else {
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
-            for (PartitionRecords<K, V> part : this.records) {
-                Long consumed = subscriptions.consumed(part.partition);
-                if (this.subscriptions.assignedPartitions().contains(part.partition)
-                    && (consumed == null || part.fetchOffset == consumed)) {
-                    List<ConsumerRecord<K, V>> records = drained.get(part.partition);
-                    if (records == null) {
-                        records = part.records;
-                        drained.put(part.partition, records);
-                    } else {
-                        records.addAll(part.records);
-                    }
-                    subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
-                } else {
-                    // these records aren't next in line based on the last consumed position, ignore them
-                    // they must be from an obsolete request
-                    log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
-                }
-            }
-            this.records.clear();
-            return drained;
-        }
-    }
-
-    /**
-     * Fetch a single offset before the given timestamp for the partition.
-     *
-     * @param topicPartition The partition that needs fetching offset.
-     * @param timestamp The timestamp for fetching offset.
-     * @return A response which can be polled to obtain the corresponding offset.
-     */
-    private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
-        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
-        partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
-        PartitionInfo info = metadata.fetch().partition(topicPartition);
-        if (info == null) {
-            metadata.add(topicPartition.topic());
-            log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
-            return RequestFuture.staleMetadata();
-        } else if (info.leader() == null) {
-            log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
-            return RequestFuture.leaderNotAvailable();
-        } else {
-            Node node = info.leader();
-            ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
-            return client.send(node, ApiKeys.LIST_OFFSETS, request)
-                    .compose(new RequestFutureAdapter<ClientResponse, Long>() {
-                        @Override
-                        public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
-                            handleListOffsetResponse(topicPartition, response, future);
-                        }
-                    });
-        }
-    }
-
-    /**
-     * Callback for the response of the list offset call above.
-     * @param topicPartition The partition that was fetched
-     * @param clientResponse The response from the server.
-     */
-    private void handleListOffsetResponse(TopicPartition topicPartition,
-                                          ClientResponse clientResponse,
-                                          RequestFuture<Long> future) {
-        if (clientResponse.wasDisconnected()) {
-            future.raise(new DisconnectException());
-        } else {
-            ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
-            short errorCode = lor.responseData().get(topicPartition).errorCode;
-            if (errorCode == Errors.NONE.code()) {
-                List<Long> offsets = lor.responseData().get(topicPartition).offsets;
-                if (offsets.size() != 1)
-                    throw new IllegalStateException("This should not happen.");
-                long offset = offsets.get(0);
-                log.debug("Fetched offset {} for partition {}", offset, topicPartition);
-
-                future.complete(offset);
-            } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                    || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
-                        topicPartition);
-                future.raise(Errors.forCode(errorCode));
-            } else {
-                log.error("Attempt to fetch offsets for partition {} failed due to: {}",
-                        topicPartition, Errors.forCode(errorCode).exception().getMessage());
-                future.raise(new StaleMetadataException());
-            }
-        }
-    }
-
-    /**
-     * Create fetch requests for all nodes for which we have assigned partitions
-     * that have no existing requests in flight.
-     */
-    private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
-        // create the fetch info
-        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
-        for (TopicPartition partition : subscriptions.assignedPartitions()) {
-            Node node = cluster.leaderFor(partition);
-            if (node == null) {
-                metadata.requestUpdate();
-            } else if (this.client.pendingRequestCount(node) == 0) {
-                // if there is a leader and no in-flight requests, issue a new fetch
-                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
-                if (fetch == null) {
-                    fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
-                    fetchable.put(node, fetch);
-                }
-                long offset = this.subscriptions.fetched(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
-            }
-        }
-
-        // create the fetches
-        Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
-        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
-            Node node = entry.getKey();
-            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
-            requests.put(node, fetch);
-        }
-        return requests;
-    }
-
-    /**
-     * The callback for fetch completion
-     */
-    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
-        if (resp.wasDisconnected()) {
-            int correlation = resp.request().request().header().correlationId();
-            log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
-                resp.request(), correlation, resp.request().request().destination());
-        } else {
-            int totalBytes = 0;
-            int totalCount = 0;
-            FetchResponse response = new FetchResponse(resp.responseBody());
-            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                FetchResponse.PartitionData partition = entry.getValue();
-                if (!subscriptions.assignedPartitions().contains(tp)) {
-                    log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
-                } else if (partition.errorCode == Errors.NONE.code()) {
-                    int bytes = 0;
-                    ByteBuffer buffer = partition.recordSet;
-                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                    long fetchOffset = request.fetchData().get(tp).offset;
-                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
-                    for (LogEntry logEntry : records) {
-                        parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.size();
-                    }
-                    if (parsed.size() > 0) {
-                        ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                        this.subscriptions.fetched(tp, record.offset() + 1);
-                        this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
-                        this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                    }
-                    this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
-                    totalBytes += bytes;
-                    totalCount += parsed.size();
-                } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                    || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                    this.metadata.requestUpdate();
-                } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
-                    // TODO: this could be optimized by grouping all out-of-range partitions
-                    log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
-                    subscriptions.needOffsetReset(tp);
-                } else if (partition.errorCode == Errors.UNKNOWN.code()) {
-                    log.warn("Unknown error fetching data for topic-partition {}", tp);
-                } else {
-                    throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
-                }
-            }
-            this.sensors.bytesFetched.record(totalBytes);
-            this.sensors.recordsFetched.record(totalCount);
-        }
-        this.sensors.fetchLatency.record(resp.requestLatencyMs());
-    }
-
-    /**
-     * Parse the record entry, deserializing the key / value fields if necessary
-     */
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
-        if (this.checkCrcs)
-            logEntry.record().ensureValid();
-
-        long offset = logEntry.offset();
-        ByteBuffer keyBytes = logEntry.record().key();
-        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
-        ByteBuffer valueBytes = logEntry.record().value();
-        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
-
-        return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
-    }
-
-    private static class PartitionRecords<K, V> {
-        public long fetchOffset;
-        public TopicPartition partition;
-        public List<ConsumerRecord<K, V>> records;
-
-        public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
-            this.fetchOffset = fetchOffset;
-            this.partition = partition;
-            this.records = records;
-        }
-    }
-
-    private class FetchManagerMetrics {
-        public final Metrics metrics;
-        public final String metricGrpName;
-
-        public final Sensor bytesFetched;
-        public final Sensor recordsFetched;
-        public final Sensor fetchLatency;
-        public final Sensor recordsFetchLag;
-
-
-        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
-            this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
-
-            this.bytesFetched = metrics.sensor("bytes-fetched");
-            this.bytesFetched.add(new MetricName("fetch-size-avg",
-                this.metricGrpName,
-                "The average number of bytes fetched per request",
-                tags), new Avg());
-            this.bytesFetched.add(new MetricName("fetch-size-max",
-                this.metricGrpName,
-                "The maximum number of bytes fetched per request",
-                tags), new Max());
-            this.bytesFetched.add(new MetricName("bytes-consumed-rate",
-                this.metricGrpName,
-                "The average number of bytes consumed per second",
-                tags), new Rate());
-
-            this.recordsFetched = metrics.sensor("records-fetched");
-            this.recordsFetched.add(new MetricName("records-per-request-avg",
-                this.metricGrpName,
-                "The average number of records in each request",
-                tags), new Avg());
-            this.recordsFetched.add(new MetricName("records-consumed-rate",
-                this.metricGrpName,
-                "The average number of records consumed per second",
-                tags), new Rate());
-
-            this.fetchLatency = metrics.sensor("fetch-latency");
-            this.fetchLatency.add(new MetricName("fetch-latency-avg",
-                this.metricGrpName,
-                "The average time taken for a fetch request.",
-                tags), new Avg());
-            this.fetchLatency.add(new MetricName("fetch-latency-max",
-                this.metricGrpName,
-                "The max time taken for any fetch request.",
-                tags), new Max());
-            this.fetchLatency.add(new MetricName("fetch-rate",
-                this.metricGrpName,
-                "The number of fetch requests per second.",
-                tags), new Rate(new Count()));
-
-            this.recordsFetchLag = metrics.sensor("records-lag");
-            this.recordsFetchLag.add(new MetricName("records-lag-max",
-                this.metricGrpName,
-                "The maximum lag in terms of number of records for any partition in this window",
-                tags), new Max());
-        }
-
-        public void recordTopicFetchMetrics(String topic, int bytes, int records) {
-            // record bytes fetched
-            String name = "topic." + topic + ".bytes-fetched";
-            Sensor bytesFetched = this.metrics.getSensor(name);
-            if (bytesFetched == null)
-                bytesFetched = this.metrics.sensor(name);
-            bytesFetched.record(bytes);
-
-            // record records fetched
-            name = "topic." + topic + ".records-fetched";
-            Sensor recordsFetched = this.metrics.getSensor(name);
-            if (recordsFetched == null)
-                recordsFetched = this.metrics.sensor(name);
-            recordsFetched.record(records);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
deleted file mode 100644
index f412897..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A helper class for managing the heartbeat to the coordinator
- */
-public final class Heartbeat {
-    
-    /* The number of heartbeats to attempt to complete per session timeout interval.
-     * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
-     * once per second.
-     */
-    public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
-
-    private final long timeout;
-    private long lastHeartbeatSend;
-    private long lastHeartbeatReceive;
-    private long lastSessionReset;
-
-    public Heartbeat(long timeout, long now) {
-        this.timeout = timeout;
-        this.lastSessionReset = now;
-    }
-
-    public void sentHeartbeat(long now) {
-        this.lastHeartbeatSend = now;
-    }
-
-    public void receiveHeartbeat(long now) {
-        this.lastHeartbeatReceive = now;
-    }
-
-    public boolean shouldHeartbeat(long now) {
-        return timeToNextHeartbeat(now) == 0;
-    }
-    
-    public long lastHeartbeatSend() {
-        return this.lastHeartbeatSend;
-    }
-
-    public long timeToNextHeartbeat(long now) {
-        long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
-
-        long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
-        if (timeSinceLastHeartbeat > hbInterval)
-            return 0;
-        else
-            return hbInterval - timeSinceLastHeartbeat;
-    }
-
-    public boolean sessionTimeoutExpired(long now) {
-        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
-    }
-
-    public long interval() {
-        return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
-    }
-
-    public void resetSessionTimeout(long now) {
-        this.lastSessionReset = now;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
deleted file mode 100644
index 421c64e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * No brokers were available to complete a request.
- */
-public class NoAvailableBrokersException extends InvalidMetadataException {
-    private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
deleted file mode 100644
index 6da31dd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerRebalanceCallback;
-import org.apache.flink.kafka_backport.clients.consumer.Consumer;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.util.Collection;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
-
-    @Override
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-    @Override
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
deleted file mode 100644
index 7b6edc3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.RetriableException;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Result of an asynchronous request from {@link ConsumerNetworkClient}. Use {@link ConsumerNetworkClient#poll(long)}
- * (and variants) to finish a request future. Use {@link #isDone()} to check if the future is complete, and
- * {@link #succeeded()} to check if the request completed successfully. Typical usage might look like this:
- *
- * <pre>
- *     RequestFuture<ClientResponse> future = client.send(api, request);
- *     client.poll(future);
- *
- *     if (future.succeeded()) {
- *         ClientResponse response = future.value();
- *         // Handle response
- *     } else {
- *         throw future.exception();
- *     }
- * </pre>
- *
- * @param <T> Return type of the result (Can be Void if there is no response)
- */
-public class RequestFuture<T> {
-
-    private boolean isDone = false;
-    private T value;
-    private RuntimeException exception;
-    private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
-
-
-    /**
-     * Check whether the response is ready to be handled
-     * @return true if the response is ready, false otherwise
-     */
-    public boolean isDone() {
-        return isDone;
-    }
-
-    /**
-     * Get the value corresponding to this request (only available if the request succeeded)
-     * @return the value if it exists or null
-     */
-    public T value() {
-        return value;
-    }
-
-    /**
-     * Check if the request succeeded;
-     * @return true if the request completed and was successful
-     */
-    public boolean succeeded() {
-        return isDone && exception == null;
-    }
-
-    /**
-     * Check if the request failed.
-     * @return true if the request completed with a failure
-     */
-    public boolean failed() {
-        return isDone && exception != null;
-    }
-
-    /**
-     * Check if the request is retriable (convenience method for checking if
-     * the exception is an instance of {@link org.apache.flink.kafka_backport.common.errors.RetriableException}.
-     * @return true if it is retriable, false otherwise
-     */
-    public boolean isRetriable() {
-        return exception instanceof RetriableException;
-    }
-
-    /**
-     * Get the exception from a failed result (only available if the request failed)
-     * @return The exception if it exists or null
-     */
-    public RuntimeException exception() {
-        return exception;
-    }
-
-    /**
-     * Complete the request successfully. After this call, {@link #succeeded()} will return true
-     * and the value can be obtained through {@link #value()}.
-     * @param value corresponding value (or null if there is none)
-     */
-    public void complete(T value) {
-        this.value = value;
-        this.isDone = true;
-        fireSuccess();
-    }
-
-    /**
-     * Raise an exception. The request will be marked as failed, and the caller can either
-     * handle the exception or throw it.
-     * @param e corresponding exception to be passed to caller
-     */
-    public void raise(RuntimeException e) {
-        this.exception = e;
-        this.isDone = true;
-        fireFailure();
-    }
-
-    /**
-     * Raise an error. The request will be marked as failed.
-     * @param error corresponding error to be passed to caller
-     */
-    public void raise(Errors error) {
-        raise(error.exception());
-    }
-
-    private void fireSuccess() {
-        for (RequestFutureListener listener: listeners)
-            listener.onSuccess(value);
-    }
-
-    private void fireFailure() {
-        for (RequestFutureListener listener: listeners)
-            listener.onFailure(exception);
-    }
-
-    /**
-     * Add a listener which will be notified when the future completes
-     * @param listener
-     */
-    public void addListener(RequestFutureListener<T> listener) {
-        if (isDone) {
-            if (exception != null)
-                listener.onFailure(exception);
-            else
-                listener.onSuccess(value);
-        } else {
-            this.listeners.add(listener);
-        }
-    }
-
-    /**
-     * Convert from a request future of one type to another type
-     * @param adapter The adapter which does the conversion
-     * @param <S> The type of the future adapted to
-     * @return The new future
-     */
-    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
-        final RequestFuture<S> adapted = new RequestFuture<S>();
-        addListener(new RequestFutureListener<T>() {
-            @Override
-            public void onSuccess(T value) {
-                adapter.onSuccess(value, adapted);
-            }
-
-            @Override
-            public void onFailure(RuntimeException e) {
-                adapter.onFailure(e, adapted);
-            }
-        });
-        return adapted;
-    }
-
-    public static <T> RequestFuture<T> failure(RuntimeException e) {
-        RequestFuture<T> future = new RequestFuture<T>();
-        future.raise(e);
-        return future;
-    }
-
-    public static RequestFuture<Void> voidSuccess() {
-        RequestFuture<Void> future = new RequestFuture<Void>();
-        future.complete(null);
-        return future;
-    }
-
-    public static <T> RequestFuture<T> coordinatorNotAvailable() {
-        return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
-    }
-
-    public static <T> RequestFuture<T> leaderNotAvailable() {
-        return failure(Errors.LEADER_NOT_AVAILABLE.exception());
-    }
-
-    public static <T> RequestFuture<T> noBrokersAvailable() {
-        return failure(new NoAvailableBrokersException());
-    }
-
-    public static <T> RequestFuture<T> staleMetadata() {
-        return failure(new StaleMetadataException());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
deleted file mode 100644
index b34c2da..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Adapt from a request future of one type to another.
- *
- * @param <F> Type to adapt from
- * @param <T> Type to adapt to
- */
-public abstract class RequestFutureAdapter<F, T> {
-
-    public abstract void onSuccess(F value, RequestFuture<T> future);
-
-    public void onFailure(RuntimeException e, RequestFuture<T> future) {
-        future.raise(e);
-    }
-}