You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:38 UTC
[21/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
new file mode 100644
index 0000000..1746c22
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Coordinator.java
@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.clients.ClientResponse;
+import org.apache.flink.kafka_backport.clients.consumer.CommitType;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerCommitCallback;
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.errors.DisconnectException;
+import org.apache.flink.kafka_backport.common.metrics.Measurable;
+import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
+import org.apache.flink.kafka_backport.common.metrics.Metrics;
+import org.apache.flink.kafka_backport.common.metrics.Sensor;
+import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
+import org.apache.flink.kafka_backport.common.metrics.stats.Count;
+import org.apache.flink.kafka_backport.common.metrics.stats.Max;
+import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.requests.ConsumerMetadataRequest;
+import org.apache.flink.kafka_backport.common.requests.ConsumerMetadataResponse;
+import org.apache.flink.kafka_backport.common.requests.HeartbeatRequest;
+import org.apache.flink.kafka_backport.common.requests.HeartbeatResponse;
+import org.apache.flink.kafka_backport.common.requests.JoinGroupRequest;
+import org.apache.flink.kafka_backport.common.requests.JoinGroupResponse;
+import org.apache.flink.kafka_backport.common.requests.OffsetCommitRequest;
+import org.apache.flink.kafka_backport.common.requests.OffsetCommitResponse;
+import org.apache.flink.kafka_backport.common.requests.OffsetFetchRequest;
+import org.apache.flink.kafka_backport.common.requests.OffsetFetchResponse;
+import org.apache.flink.kafka_backport.common.utils.Time;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class manages the coordination process with the consumer coordinator.
+ */
+public final class Coordinator {
+
+ private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
+
+ private final ConsumerNetworkClient client;
+ private final Time time;
+ private final String groupId;
+ private final Heartbeat heartbeat;
+ private final HeartbeatTask heartbeatTask;
+ private final int sessionTimeoutMs;
+ private final String assignmentStrategy;
+ private final SubscriptionState subscriptions;
+ private final CoordinatorMetrics sensors;
+ private final long requestTimeoutMs;
+ private final long retryBackoffMs;
+ private final RebalanceCallback rebalanceCallback;
+ private Node consumerCoordinator;
+ private String consumerId;
+ private int generation;
+
+
+ /**
+ * Initialize the coordination manager.
+ */
+ public Coordinator(ConsumerNetworkClient client,
+ String groupId,
+ int sessionTimeoutMs,
+ String assignmentStrategy,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time,
+ long requestTimeoutMs,
+ long retryBackoffMs,
+ RebalanceCallback rebalanceCallback) {
+
+ this.client = client;
+ this.time = time;
+ this.generation = -1;
+ this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+ this.groupId = groupId;
+ this.consumerCoordinator = null;
+ this.subscriptions = subscriptions;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.assignmentStrategy = assignmentStrategy;
+ this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+ this.heartbeatTask = new HeartbeatTask();
+ this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ this.rebalanceCallback = rebalanceCallback;
+ }
+
+ /**
+ * Refresh the committed offsets for provided partitions.
+ */
+ public void refreshCommittedOffsetsIfNeeded() {
+ if (subscriptions.refreshCommitsNeeded()) {
+ Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ this.subscriptions.committed(tp, entry.getValue());
+ }
+ this.subscriptions.commitsRefreshed();
+ }
+ }
+
+ /**
+ * Fetch the current committed offsets from the coordinator for a set of partitions.
+ * @param partitions The partitions to fetch offsets for
+ * @return A map from partition to the committed offset
+ */
+ public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
+ while (true) {
+ ensureCoordinatorKnown();
+ ensurePartitionAssignment();
+
+ // contact coordinator to fetch committed offsets
+ RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
+ client.poll(future);
+
+ if (future.succeeded())
+ return future.value();
+
+ if (!future.isRetriable())
+ throw future.exception();
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
+ * Ensure that we have a valid partition assignment from the coordinator.
+ */
+ public void ensurePartitionAssignment() {
+ if (!subscriptions.partitionAssignmentNeeded())
+ return;
+
+ // execute the user's callback before rebalance
+ log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+ rebalanceCallback.onPartitionsRevoked(revoked);
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition revocation: ", e);
+ }
+
+ reassignPartitions();
+
+ // execute the user's callback after rebalance
+ log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
+ try {
+ Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
+ rebalanceCallback.onPartitionsAssigned(assigned);
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition assignment: ", e);
+ }
+ }
+
+ private void reassignPartitions() {
+ while (subscriptions.partitionAssignmentNeeded()) {
+ ensureCoordinatorKnown();
+
+ // ensure that there are no pending requests to the coordinator. This is important
+ // in particular to avoid resending a pending JoinGroup request.
+ if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
+ client.awaitPendingRequests(this.consumerCoordinator);
+ continue;
+ }
+
+ RequestFuture<Void> future = sendJoinGroupRequest();
+ client.poll(future);
+
+ if (future.failed()) {
+ if (!future.isRetriable())
+ throw future.exception();
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+ }
+
+ /**
+ * Block until the coordinator for this group is known.
+ */
+ public void ensureCoordinatorKnown() {
+ while (coordinatorUnknown()) {
+ RequestFuture<Void> future = sendConsumerMetadataRequest();
+ client.poll(future, requestTimeoutMs);
+
+ if (future.failed())
+ client.awaitMetadataUpdate();
+ }
+ }
+
+ /**
+ * Commit offsets. This call blocks (regardless of commitType) until the coordinator
+ * can receive the commit request. Once the request has been made, however, only the
+ * synchronous commits will wait for a successful response from the coordinator.
+ * @param offsets Offsets to commit.
+ * @param commitType Commit policy
+ * @param callback Callback to be executed when the commit request finishes
+ */
+ public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+ if (commitType == CommitType.ASYNC)
+ commitOffsetsAsync(offsets, callback);
+ else
+ commitOffsetsSync(offsets, callback);
+ }
+
+ private class HeartbeatTask implements DelayedTask {
+
+ public void reset() {
+ // start or restart the heartbeat task to be executed at the next chance
+ long now = time.milliseconds();
+ heartbeat.resetSessionTimeout(now);
+ client.unschedule(this);
+ client.schedule(this, now);
+ }
+
+ @Override
+ public void run(final long now) {
+ if (!subscriptions.partitionsAutoAssigned() ||
+ subscriptions.partitionAssignmentNeeded() ||
+ coordinatorUnknown())
+ // no need to send if we're not using auto-assignment or if we are
+ // awaiting a rebalance
+ return;
+
+ if (heartbeat.sessionTimeoutExpired(now)) {
+ // we haven't received a successful heartbeat in one session interval
+ // so mark the coordinator dead
+ coordinatorDead();
+ return;
+ }
+
+ if (!heartbeat.shouldHeartbeat(now)) {
+ // we don't need to heartbeat now, so reschedule for when we do
+ client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
+ } else {
+ heartbeat.sentHeartbeat(now);
+ RequestFuture<Void> future = sendHeartbeatRequest();
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ long now = time.milliseconds();
+ heartbeat.receiveHeartbeat(now);
+ long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
+ client.schedule(HeartbeatTask.this, nextHeartbeatTime);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Send a request to get a new partition assignment. This is a non-blocking call which sends
+ * a JoinGroup request to the coordinator (if it is available). The returned future must
+ * be polled to see if the request completed successfully.
+ * @return A request future whose completion indicates the result of the JoinGroup request.
+ */
+ private RequestFuture<Void> sendJoinGroupRequest() {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ // send a join group request to the coordinator
+ List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
+ log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
+
+ JoinGroupRequest request = new JoinGroupRequest(groupId,
+ this.sessionTimeoutMs,
+ subscribedTopics,
+ this.consumerId,
+ this.assignmentStrategy);
+
+ // create the request for the coordinator
+ log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
+ return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
+ .compose(new JoinGroupResponseHandler());
+ }
+
+ private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
+
+ @Override
+ public JoinGroupResponse parse(ClientResponse response) {
+ return new JoinGroupResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
+ // process the response
+ short errorCode = joinResponse.errorCode();
+
+ if (errorCode == Errors.NONE.code()) {
+ Coordinator.this.consumerId = joinResponse.consumerId();
+ Coordinator.this.generation = joinResponse.generationId();
+
+ // set the flag to refresh last committed offsets
+ subscriptions.needRefreshCommits();
+
+ log.debug("Joined group: {}", joinResponse.toStruct());
+
+ // record re-assignment time
+ sensors.partitionReassignments.record(response.requestLatencyMs());
+
+ // update partition assignment
+ subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
+ heartbeatTask.reset();
+ future.complete(null);
+ } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
+ // reset the consumer id and retry immediately
+ Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+ log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
+ groupId);
+ future.raise(Errors.UNKNOWN_CONSUMER_ID);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ // re-discover the coordinator and retry with backoff
+ coordinatorDead();
+ log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
+ groupId);
+ future.raise(Errors.forCode(errorCode));
+ } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
+ || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
+ || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
+ // log the error and re-throw the exception
+ Errors error = Errors.forCode(errorCode);
+ log.error("Attempt to join group {} failed due to: {}",
+ groupId, error.exception().getMessage());
+ future.raise(error);
+ } else {
+ // unexpected error, throw the exception
+ future.raise(new KafkaException("Unexpected error in join group response: "
+ + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
+ }
+ }
+ }
+
+ private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
+ this.subscriptions.needRefreshCommits();
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ if (callback != null) {
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ callback.onComplete(offsets, null);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ callback.onComplete(offsets, e);
+ }
+ });
+ }
+ }
+
+ private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
+ while (true) {
+ ensureCoordinatorKnown();
+ ensurePartitionAssignment();
+
+ RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
+ client.poll(future);
+
+ if (future.succeeded()) {
+ if (callback != null)
+ callback.onComplete(offsets, null);
+ return;
+ }
+
+ if (!future.isRetriable()) {
+ if (callback == null)
+ throw future.exception();
+ else
+ callback.onComplete(offsets, future.exception());
+ return;
+ }
+
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
+ * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
+ * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
+ * asynchronous case.
+ *
+ * @param offsets The list of offsets per partition that should be committed.
+ * @return A request future whose value indicates whether the commit was successful or not
+ */
+ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ if (offsets.isEmpty())
+ return RequestFuture.voidSuccess();
+
+ // create the offset commit request
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
+ offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+ OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+ this.generation,
+ this.consumerId,
+ OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+ offsetData);
+
+ return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
+ .compose(new OffsetCommitResponseHandler(offsets));
+ }
+
+
+ private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
+
+ private final Map<TopicPartition, Long> offsets;
+
+ public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
+ this.offsets = offsets;
+ }
+
+ @Override
+ public OffsetCommitResponse parse(ClientResponse response) {
+ return new OffsetCommitResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
+ sensors.commitLatency.record(response.requestLatencyMs());
+ for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ long offset = this.offsets.get(tp);
+ short errorCode = entry.getValue();
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ subscriptions.committed(tp, offset);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ future.raise(Errors.forCode(errorCode));
+ return;
+ } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
+ || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
+ // do not need to throw the exception but just log the error
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+ || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ // need to re-join group
+ subscriptions.needReassignment();
+ future.raise(Errors.forCode(errorCode));
+ return;
+ } else {
+ // do not need to throw the exception but just log the error
+ future.raise(Errors.forCode(errorCode));
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ }
+ }
+
+ future.complete(null);
+ }
+ }
+
+ /**
+ * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
+ * returned future can be polled to get the actual offsets returned from the broker.
+ *
+ * @param partitions The set of partitions to get offsets for.
+ * @return A request future containing the committed offsets.
+ */
+ private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
+ if (coordinatorUnknown())
+ return RequestFuture.coordinatorNotAvailable();
+
+ log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", "));
+ // construct the request
+ OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+ // send the request with a callback
+ return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
+ .compose(new OffsetFetchResponseHandler());
+ }
+
+ private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
+
+ @Override
+ public OffsetFetchResponse parse(ClientResponse response) {
+ return new OffsetFetchResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
+ Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetFetchResponse.PartitionData data = entry.getValue();
+ if (data.hasError()) {
+ log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+ .exception()
+ .getMessage());
+ if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+ // just retry
+ future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
+ } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ // re-discover the coordinator and retry
+ coordinatorDead();
+ future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
+ } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+ || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ // need to re-join group
+ subscriptions.needReassignment();
+ future.raise(Errors.forCode(data.errorCode));
+ } else {
+ future.raise(new KafkaException("Unexpected error in fetch offset response: "
+ + Errors.forCode(data.errorCode).exception().getMessage()));
+ }
+ return;
+ } else if (data.offset >= 0) {
+ // record the position with the offset (-1 indicates no committed offset to fetch)
+ offsets.put(tp, data.offset);
+ } else {
+ log.debug("No committed offset for partition " + tp);
+ }
+ }
+
+ future.complete(offsets);
+ }
+ }
+
+ /**
+ * Send a heartbeat request now (visible only for testing).
+ */
+ public RequestFuture<Void> sendHeartbeatRequest() {
+ HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
+ return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
+ .compose(new HeartbeatCompletionHandler());
+ }
+
+ public boolean coordinatorUnknown() {
+ return this.consumerCoordinator == null;
+ }
+
+ /**
+ * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
+ * one of the brokers. The returned future should be polled to get the result of the request.
+ * @return A request future which indicates the completion of the metadata request
+ */
+ private RequestFuture<Void> sendConsumerMetadataRequest() {
+ // initiate the consumer metadata request
+ // find a node to ask about the coordinator
+ Node node = this.client.leastLoadedNode();
+ if (node == null) {
+ // TODO: If there are no brokers left, perhaps we should use the bootstrap set
+ // from configuration?
+ return RequestFuture.noBrokersAvailable();
+ } else {
+ // create a consumer metadata request
+ log.debug("Issuing consumer metadata request to broker {}", node.id());
+ ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
+ return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
+ .compose(new RequestFutureAdapter<ClientResponse, Void>() {
+ @Override
+ public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
+ handleConsumerMetadataResponse(response, future);
+ }
+ });
+ }
+ }
+
+ private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
+ log.debug("Consumer metadata response {}", resp);
+
+ // parse the response to get the coordinator info if it is not disconnected,
+ // otherwise we need to request metadata update
+ if (resp.wasDisconnected()) {
+ future.raise(new DisconnectException());
+ } else if (!coordinatorUnknown()) {
+ // We already found the coordinator, so ignore the request
+ future.complete(null);
+ } else {
+ ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
+ // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
+ // for the coordinator in the underlying network client layer
+ // TODO: this needs to be better handled in KAFKA-1935
+ if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
+ this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
+ consumerMetadataResponse.node().host(),
+ consumerMetadataResponse.node().port());
+ heartbeatTask.reset();
+ future.complete(null);
+ } else {
+ future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
+ }
+ }
+ }
+
+ /**
+ * Mark the current coordinator as dead.
+ */
+ private void coordinatorDead() {
+ if (this.consumerCoordinator != null) {
+ log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
+ this.consumerCoordinator = null;
+ }
+ }
+
+ private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+ @Override
+ public HeartbeatResponse parse(ClientResponse response) {
+ return new HeartbeatResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
+ sensors.heartbeatLatency.record(response.requestLatencyMs());
+ short error = heartbeatResponse.errorCode();
+ if (error == Errors.NONE.code()) {
+ log.debug("Received successful heartbeat response.");
+ future.complete(null);
+ } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+ coordinatorDead();
+ future.raise(Errors.forCode(error));
+ } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+ log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+ subscriptions.needReassignment();
+ future.raise(Errors.ILLEGAL_GENERATION);
+ } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
+ log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
+ consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+ subscriptions.needReassignment();
+ future.raise(Errors.UNKNOWN_CONSUMER_ID);
+ } else {
+ future.raise(new KafkaException("Unexpected error in heartbeat response: "
+ + Errors.forCode(error).exception().getMessage()));
+ }
+ }
+ }
+
+ private abstract class CoordinatorResponseHandler<R, T>
+ extends RequestFutureAdapter<ClientResponse, T> {
+ protected ClientResponse response;
+
+ public abstract R parse(ClientResponse response);
+
+ public abstract void handle(R response, RequestFuture<T> future);
+
+ @Override
+ public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
+ this.response = clientResponse;
+
+ if (clientResponse.wasDisconnected()) {
+ int correlation = response.request().request().header().correlationId();
+ log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+ response.request(),
+ correlation,
+ response.request().request().destination());
+
+ // mark the coordinator as dead
+ coordinatorDead();
+ future.raise(new DisconnectException());
+ return;
+ }
+
+ R response = parse(clientResponse);
+ handle(response, future);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e, RequestFuture<T> future) {
+ if (e instanceof DisconnectException) {
+ log.debug("Coordinator request failed", e);
+ coordinatorDead();
+ }
+ future.raise(e);
+ }
+ }
+
+ public interface RebalanceCallback {
+ void onPartitionsAssigned(Collection<TopicPartition> partitions);
+ void onPartitionsRevoked(Collection<TopicPartition> partitions);
+ }
+
+ private class CoordinatorMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor commitLatency;
+ public final Sensor heartbeatLatency;
+ public final Sensor partitionReassignments;
+
+ public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+ this.commitLatency = metrics.sensor("commit-latency");
+ this.commitLatency.add(new MetricName("commit-latency-avg",
+ this.metricGrpName,
+ "The average time taken for a commit request",
+ tags), new Avg());
+ this.commitLatency.add(new MetricName("commit-latency-max",
+ this.metricGrpName,
+ "The max time taken for a commit request",
+ tags), new Max());
+ this.commitLatency.add(new MetricName("commit-rate",
+ this.metricGrpName,
+ "The number of commit calls per second",
+ tags), new Rate(new Count()));
+
+ this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+ this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+ this.metricGrpName,
+ "The max time taken to receive a response to a hearbeat request",
+ tags), new Max());
+ this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+ this.metricGrpName,
+ "The average number of heartbeats per second",
+ tags), new Rate(new Count()));
+
+ this.partitionReassignments = metrics.sensor("reassignment-latency");
+ this.partitionReassignments.add(new MetricName("reassignment-time-avg",
+ this.metricGrpName,
+ "The average time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-time-max",
+ this.metricGrpName,
+ "The max time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-rate",
+ this.metricGrpName,
+ "The number of partition reassignments per second",
+ tags), new Rate(new Count()));
+
+ Measurable numParts =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
+ metrics.addMetric(new MetricName("assigned-partitions",
+ this.metricGrpName,
+ "The number of partitions currently assigned to this consumer",
+ tags),
+ numParts);
+
+ Measurable lastHeartbeat =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ };
+ metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+ this.metricGrpName,
+ "The number of seconds since the last controller heartbeat",
+ tags),
+ lastHeartbeat);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
new file mode 100644
index 0000000..7792dff
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTask.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+
+public interface DelayedTask {
+
+ /**
+ * Execute the task.
+ * @param now current time in milliseconds
+ */
+ void run(long now);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
new file mode 100644
index 0000000..fbd1e19
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/DelayedTaskQueue.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Tracks a set of tasks to be executed after a delay.
+ */
+public class DelayedTaskQueue {
+
+ private PriorityQueue<Entry> tasks;
+
+ public DelayedTaskQueue() {
+ tasks = new PriorityQueue<Entry>();
+ }
+
+ /**
+ * Schedule a task for execution in the future.
+ *
+ * @param task the task to execute
+ * @param at the time at which to
+ */
+ public void add(DelayedTask task, long at) {
+ tasks.add(new Entry(task, at));
+ }
+
+ /**
+ * Remove a task from the queue if it is present
+ * @param task the task to be removed
+ * @returns true if a task was removed as a result of this call
+ */
+ public boolean remove(DelayedTask task) {
+ boolean wasRemoved = false;
+ Iterator<Entry> iterator = tasks.iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ if (entry.task.equals(task)) {
+ iterator.remove();
+ wasRemoved = true;
+ }
+ }
+ return wasRemoved;
+ }
+
+ /**
+ * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
+ *
+ * @return the remaining time in milliseconds
+ */
+ public long nextTimeout(long now) {
+ if (tasks.isEmpty())
+ return Long.MAX_VALUE;
+ else
+ return Math.max(tasks.peek().timeout - now, 0);
+ }
+
+ /**
+ * Run any ready tasks.
+ *
+ * @param now the current time
+ */
+ public void poll(long now) {
+ while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
+ Entry entry = tasks.poll();
+ entry.task.run(now);
+ }
+ }
+
+ private static class Entry implements Comparable<Entry> {
+ DelayedTask task;
+ long timeout;
+
+ public Entry(DelayedTask task, long timeout) {
+ this.task = task;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public int compareTo(Entry entry) {
+ return Long.compare(timeout, entry.timeout);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
new file mode 100644
index 0000000..6962a54
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Fetcher.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.clients.ClientResponse;
+import org.apache.flink.kafka_backport.clients.Metadata;
+import org.apache.flink.kafka_backport.clients.consumer.NoOffsetForPartitionException;
+import org.apache.flink.kafka_backport.clients.consumer.OffsetResetStrategy;
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
+import org.apache.flink.kafka_backport.common.metrics.Metrics;
+import org.apache.flink.kafka_backport.common.metrics.Sensor;
+import org.apache.flink.kafka_backport.common.metrics.stats.Avg;
+import org.apache.flink.kafka_backport.common.metrics.stats.Rate;
+import org.apache.flink.kafka_backport.common.record.LogEntry;
+import org.apache.flink.kafka_backport.common.record.MemoryRecords;
+import org.apache.flink.kafka_backport.common.requests.FetchRequest;
+import org.apache.flink.kafka_backport.common.requests.FetchResponse;
+import org.apache.flink.kafka_backport.common.serialization.Deserializer;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecord;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.errors.DisconnectException;
+import org.apache.flink.kafka_backport.common.metrics.stats.Count;
+import org.apache.flink.kafka_backport.common.metrics.stats.Max;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+import org.apache.flink.kafka_backport.common.requests.ListOffsetRequest;
+import org.apache.flink.kafka_backport.common.requests.ListOffsetResponse;
+import org.apache.flink.kafka_backport.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This class manage the fetching process with the brokers.
+ */
+public class Fetcher<K, V> {
+ private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+
+ private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
+
+ private final ConsumerNetworkClient client;
+ private final Time time;
+ private final int minBytes;
+ private final int maxWaitMs;
+ private final int fetchSize;
+ private final long retryBackoffMs;
+ private final boolean checkCrcs;
+ private final Metadata metadata;
+ private final FetchManagerMetrics sensors;
+ private final SubscriptionState subscriptions;
+ private final List<PartitionRecords<K, V>> records;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valueDeserializer;
+
+ public Fetcher(ConsumerNetworkClient client,
+ int minBytes,
+ int maxWaitMs,
+ int fetchSize,
+ boolean checkCrcs,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ Metadata metadata,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time,
+ long retryBackoffMs) {
+
+ this.time = time;
+ this.client = client;
+ this.metadata = metadata;
+ this.subscriptions = subscriptions;
+ this.minBytes = minBytes;
+ this.maxWaitMs = maxWaitMs;
+ this.fetchSize = fetchSize;
+ this.checkCrcs = checkCrcs;
+
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+
+ this.records = new LinkedList<PartitionRecords<K, V>>();
+
+ this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ /**
+ * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
+ *
+ * @param cluster The current cluster metadata
+ */
+ public void initFetches(Cluster cluster) {
+ for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
+ final FetchRequest fetch = fetchEntry.getValue();
+ client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
+ .addListener(new RequestFutureListener<ClientResponse>() {
+ @Override
+ public void onSuccess(ClientResponse response) {
+ handleFetchResponse(response, fetch);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ log.debug("Fetch failed", e);
+ }
+ });
+ }
+ }
+
+ /**
+ * Update the fetch positions for the provided partitions.
+ * @param partitions
+ */
+ public void updateFetchPositions(Set<TopicPartition> partitions) {
+ // reset the fetch position to the committed position
+ for (TopicPartition tp : partitions) {
+ // skip if we already have a fetch position
+ if (subscriptions.fetched(tp) != null)
+ continue;
+
+ // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if (subscriptions.isOffsetResetNeeded(tp)) {
+ resetOffset(tp);
+ } else if (subscriptions.committed(tp) == null) {
+ // there's no committed position, so we need to reset with the default strategy
+ subscriptions.needOffsetReset(tp);
+ resetOffset(tp);
+ } else {
+ log.debug("Resetting offset for partition {} to the committed offset {}",
+ tp, subscriptions.committed(tp));
+ subscriptions.seek(tp, subscriptions.committed(tp));
+ }
+ }
+ }
+
+ /**
+ * Reset offsets for the given partition using the offset reset strategy.
+ *
+ * @param partition The given partition that needs reset offset
+ * @throws org.apache.flink.kafka_backport.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+ */
+ private void resetOffset(TopicPartition partition) {
+ OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+ final long timestamp;
+ if (strategy == OffsetResetStrategy.EARLIEST)
+ timestamp = EARLIEST_OFFSET_TIMESTAMP;
+ else if (strategy == OffsetResetStrategy.LATEST)
+ timestamp = LATEST_OFFSET_TIMESTAMP;
+ else
+ throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+ log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
+ long offset = listOffset(partition, timestamp);
+ this.subscriptions.seek(partition, offset);
+ }
+
+ /**
+ * Fetch a single offset before the given timestamp for the partition.
+ *
+ * @param partition The partition that needs fetching offset.
+ * @param timestamp The timestamp for fetching offset.
+ * @return The offset of the message that is published before the given timestamp
+ */
+ private long listOffset(TopicPartition partition, long timestamp) {
+ while (true) {
+ RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
+ client.poll(future);
+
+ if (future.succeeded())
+ return future.value();
+
+ if (!future.isRetriable())
+ throw future.exception();
+
+ if (future.exception() instanceof InvalidMetadataException)
+ client.awaitMetadataUpdate();
+ else
+ Utils.sleep(retryBackoffMs);
+ }
+ }
+
+ /**
+ * Return the fetched records, empty the record buffer and update the consumed position.
+ *
+ * @return The fetched records per partition
+ */
+ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
+ if (this.subscriptions.partitionAssignmentNeeded()) {
+ return Collections.emptyMap();
+ } else {
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ for (PartitionRecords<K, V> part : this.records) {
+ Long consumed = subscriptions.consumed(part.partition);
+ if (this.subscriptions.assignedPartitions().contains(part.partition)
+ && (consumed == null || part.fetchOffset == consumed)) {
+ List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+ if (records == null) {
+ records = part.records;
+ drained.put(part.partition, records);
+ } else {
+ records.addAll(part.records);
+ }
+ subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+ } else {
+ // these records aren't next in line based on the last consumed position, ignore them
+ // they must be from an obsolete request
+ log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
+ }
+ }
+ this.records.clear();
+ return drained;
+ }
+ }
+
+ /**
+ * Fetch a single offset before the given timestamp for the partition.
+ *
+ * @param topicPartition The partition that needs fetching offset.
+ * @param timestamp The timestamp for fetching offset.
+ * @return A response which can be polled to obtain the corresponding offset.
+ */
+ private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
+ Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
+ partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
+ PartitionInfo info = metadata.fetch().partition(topicPartition);
+ if (info == null) {
+ metadata.add(topicPartition.topic());
+ log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
+ return RequestFuture.staleMetadata();
+ } else if (info.leader() == null) {
+ log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
+ return RequestFuture.leaderNotAvailable();
+ } else {
+ Node node = info.leader();
+ ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+ return client.send(node, ApiKeys.LIST_OFFSETS, request)
+ .compose(new RequestFutureAdapter<ClientResponse, Long>() {
+ @Override
+ public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
+ handleListOffsetResponse(topicPartition, response, future);
+ }
+ });
+ }
+ }
+
+ /**
+ * Callback for the response of the list offset call above.
+ * @param topicPartition The partition that was fetched
+ * @param clientResponse The response from the server.
+ */
+ private void handleListOffsetResponse(TopicPartition topicPartition,
+ ClientResponse clientResponse,
+ RequestFuture<Long> future) {
+ if (clientResponse.wasDisconnected()) {
+ future.raise(new DisconnectException());
+ } else {
+ ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
+ short errorCode = lor.responseData().get(topicPartition).errorCode;
+ if (errorCode == Errors.NONE.code()) {
+ List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+ if (offsets.size() != 1)
+ throw new IllegalStateException("This should not happen.");
+ long offset = offsets.get(0);
+ log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+
+ future.complete(offset);
+ } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+ topicPartition);
+ future.raise(Errors.forCode(errorCode));
+ } else {
+ log.error("Attempt to fetch offsets for partition {} failed due to: {}",
+ topicPartition, Errors.forCode(errorCode).exception().getMessage());
+ future.raise(new StaleMetadataException());
+ }
+ }
+ }
+
+ /**
+ * Create fetch requests for all nodes for which we have assigned partitions
+ * that have no existing requests in flight.
+ */
+ private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
+ // create the fetch info
+ Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
+ for (TopicPartition partition : subscriptions.assignedPartitions()) {
+ Node node = cluster.leaderFor(partition);
+ if (node == null) {
+ metadata.requestUpdate();
+ } else if (this.client.pendingRequestCount(node) == 0) {
+ // if there is a leader and no in-flight requests, issue a new fetch
+ Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+ if (fetch == null) {
+ fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+ fetchable.put(node, fetch);
+ }
+ long offset = this.subscriptions.fetched(partition);
+ fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
+ }
+ }
+
+ // create the fetches
+ Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
+ for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+ Node node = entry.getKey();
+ FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+ requests.put(node, fetch);
+ }
+ return requests;
+ }
+
+ /**
+ * The callback for fetch completion
+ */
+ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
+ if (resp.wasDisconnected()) {
+ int correlation = resp.request().request().header().correlationId();
+ log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
+ resp.request(), correlation, resp.request().request().destination());
+ } else {
+ int totalBytes = 0;
+ int totalCount = 0;
+ FetchResponse response = new FetchResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ FetchResponse.PartitionData partition = entry.getValue();
+ if (!subscriptions.assignedPartitions().contains(tp)) {
+ log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+ } else if (partition.errorCode == Errors.NONE.code()) {
+ int bytes = 0;
+ ByteBuffer buffer = partition.recordSet;
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ long fetchOffset = request.fetchData().get(tp).offset;
+ List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+ for (LogEntry logEntry : records) {
+ parsed.add(parseRecord(tp, logEntry));
+ bytes += logEntry.size();
+ }
+ if (parsed.size() > 0) {
+ ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
+ this.subscriptions.fetched(tp, record.offset() + 1);
+ this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
+ this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+ }
+ this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+ totalBytes += bytes;
+ totalCount += parsed.size();
+ } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ this.metadata.requestUpdate();
+ } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+ // TODO: this could be optimized by grouping all out-of-range partitions
+ log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+ subscriptions.needOffsetReset(tp);
+ } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+ log.warn("Unknown error fetching data for topic-partition {}", tp);
+ } else {
+ throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+ }
+ }
+ this.sensors.bytesFetched.record(totalBytes);
+ this.sensors.recordsFetched.record(totalCount);
+ }
+ this.sensors.fetchLatency.record(resp.requestLatencyMs());
+ }
+
+ /**
+ * Parse the record entry, deserializing the key / value fields if necessary
+ */
+ private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
+ if (this.checkCrcs)
+ logEntry.record().ensureValid();
+
+ long offset = logEntry.offset();
+ ByteBuffer keyBytes = logEntry.record().key();
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+ ByteBuffer valueBytes = logEntry.record().value();
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+
+ return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
+ }
+
+ private static class PartitionRecords<K, V> {
+ public long fetchOffset;
+ public TopicPartition partition;
+ public List<ConsumerRecord<K, V>> records;
+
+ public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
+ this.fetchOffset = fetchOffset;
+ this.partition = partition;
+ this.records = records;
+ }
+ }
+
+ private class FetchManagerMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor bytesFetched;
+ public final Sensor recordsFetched;
+ public final Sensor fetchLatency;
+ public final Sensor recordsFetchLag;
+
+
+ public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
+
+ this.bytesFetched = metrics.sensor("bytes-fetched");
+ this.bytesFetched.add(new MetricName("fetch-size-avg",
+ this.metricGrpName,
+ "The average number of bytes fetched per request",
+ tags), new Avg());
+ this.bytesFetched.add(new MetricName("fetch-size-max",
+ this.metricGrpName,
+ "The maximum number of bytes fetched per request",
+ tags), new Max());
+ this.bytesFetched.add(new MetricName("bytes-consumed-rate",
+ this.metricGrpName,
+ "The average number of bytes consumed per second",
+ tags), new Rate());
+
+ this.recordsFetched = metrics.sensor("records-fetched");
+ this.recordsFetched.add(new MetricName("records-per-request-avg",
+ this.metricGrpName,
+ "The average number of records in each request",
+ tags), new Avg());
+ this.recordsFetched.add(new MetricName("records-consumed-rate",
+ this.metricGrpName,
+ "The average number of records consumed per second",
+ tags), new Rate());
+
+ this.fetchLatency = metrics.sensor("fetch-latency");
+ this.fetchLatency.add(new MetricName("fetch-latency-avg",
+ this.metricGrpName,
+ "The average time taken for a fetch request.",
+ tags), new Avg());
+ this.fetchLatency.add(new MetricName("fetch-latency-max",
+ this.metricGrpName,
+ "The max time taken for any fetch request.",
+ tags), new Max());
+ this.fetchLatency.add(new MetricName("fetch-rate",
+ this.metricGrpName,
+ "The number of fetch requests per second.",
+ tags), new Rate(new Count()));
+
+ this.recordsFetchLag = metrics.sensor("records-lag");
+ this.recordsFetchLag.add(new MetricName("records-lag-max",
+ this.metricGrpName,
+ "The maximum lag in terms of number of records for any partition in this window",
+ tags), new Max());
+ }
+
+ public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+ // record bytes fetched
+ String name = "topic." + topic + ".bytes-fetched";
+ Sensor bytesFetched = this.metrics.getSensor(name);
+ if (bytesFetched == null)
+ bytesFetched = this.metrics.sensor(name);
+ bytesFetched.record(bytes);
+
+ // record records fetched
+ name = "topic." + topic + ".records-fetched";
+ Sensor recordsFetched = this.metrics.getSensor(name);
+ if (recordsFetched == null)
+ recordsFetched = this.metrics.sensor(name);
+ recordsFetched.record(records);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
new file mode 100644
index 0000000..f412897
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/Heartbeat.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A helper class for managing the heartbeat to the coordinator
+ */
+public final class Heartbeat {
+
+ /* The number of heartbeats to attempt to complete per session timeout interval.
+ * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
+ * once per second.
+ */
+ public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+
+ private final long timeout;
+ private long lastHeartbeatSend;
+ private long lastHeartbeatReceive;
+ private long lastSessionReset;
+
+ public Heartbeat(long timeout, long now) {
+ this.timeout = timeout;
+ this.lastSessionReset = now;
+ }
+
+ public void sentHeartbeat(long now) {
+ this.lastHeartbeatSend = now;
+ }
+
+ public void receiveHeartbeat(long now) {
+ this.lastHeartbeatReceive = now;
+ }
+
+ public boolean shouldHeartbeat(long now) {
+ return timeToNextHeartbeat(now) == 0;
+ }
+
+ public long lastHeartbeatSend() {
+ return this.lastHeartbeatSend;
+ }
+
+ public long timeToNextHeartbeat(long now) {
+ long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
+
+ long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+ if (timeSinceLastHeartbeat > hbInterval)
+ return 0;
+ else
+ return hbInterval - timeSinceLastHeartbeat;
+ }
+
+ public boolean sessionTimeoutExpired(long now) {
+ return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
+ }
+
+ public long interval() {
+ return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+ }
+
+ public void resetSessionTimeout(long now) {
+ this.lastSessionReset = now;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
new file mode 100644
index 0000000..421c64e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoAvailableBrokersException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * No brokers were available to complete a request.
+ */
+public class NoAvailableBrokersException extends InvalidMetadataException {
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
new file mode 100644
index 0000000..6da31dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerRebalanceCallback;
+import org.apache.flink.kafka_backport.clients.consumer.Consumer;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.Collection;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
+
+ @Override
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+ @Override
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
new file mode 100644
index 0000000..7b6edc3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFuture.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.common.errors.RetriableException;
+import org.apache.flink.kafka_backport.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Result of an asynchronous request from {@link ConsumerNetworkClient}. Use {@link ConsumerNetworkClient#poll(long)}
+ * (and variants) to finish a request future. Use {@link #isDone()} to check if the future is complete, and
+ * {@link #succeeded()} to check if the request completed successfully. Typical usage might look like this:
+ *
+ * <pre>
+ * RequestFuture<ClientResponse> future = client.send(api, request);
+ * client.poll(future);
+ *
+ * if (future.succeeded()) {
+ * ClientResponse response = future.value();
+ * // Handle response
+ * } else {
+ * throw future.exception();
+ * }
+ * </pre>
+ *
+ * @param <T> Return type of the result (Can be Void if there is no response)
+ */
+public class RequestFuture<T> {
+
+ private boolean isDone = false;
+ private T value;
+ private RuntimeException exception;
+ private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
+
+
+ /**
+ * Check whether the response is ready to be handled
+ * @return true if the response is ready, false otherwise
+ */
+ public boolean isDone() {
+ return isDone;
+ }
+
+ /**
+ * Get the value corresponding to this request (only available if the request succeeded)
+ * @return the value if it exists or null
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Check if the request succeeded;
+ * @return true if the request completed and was successful
+ */
+ public boolean succeeded() {
+ return isDone && exception == null;
+ }
+
+ /**
+ * Check if the request failed.
+ * @return true if the request completed with a failure
+ */
+ public boolean failed() {
+ return isDone && exception != null;
+ }
+
+ /**
+ * Check if the request is retriable (convenience method for checking if
+ * the exception is an instance of {@link org.apache.flink.kafka_backport.common.errors.RetriableException}.
+ * @return true if it is retriable, false otherwise
+ */
+ public boolean isRetriable() {
+ return exception instanceof RetriableException;
+ }
+
+ /**
+ * Get the exception from a failed result (only available if the request failed)
+ * @return The exception if it exists or null
+ */
+ public RuntimeException exception() {
+ return exception;
+ }
+
+ /**
+ * Complete the request successfully. After this call, {@link #succeeded()} will return true
+ * and the value can be obtained through {@link #value()}.
+ * @param value corresponding value (or null if there is none)
+ */
+ public void complete(T value) {
+ this.value = value;
+ this.isDone = true;
+ fireSuccess();
+ }
+
+ /**
+ * Raise an exception. The request will be marked as failed, and the caller can either
+ * handle the exception or throw it.
+ * @param e corresponding exception to be passed to caller
+ */
+ public void raise(RuntimeException e) {
+ this.exception = e;
+ this.isDone = true;
+ fireFailure();
+ }
+
+ /**
+ * Raise an error. The request will be marked as failed.
+ * @param error corresponding error to be passed to caller
+ */
+ public void raise(Errors error) {
+ raise(error.exception());
+ }
+
+ private void fireSuccess() {
+ for (RequestFutureListener listener: listeners)
+ listener.onSuccess(value);
+ }
+
+ private void fireFailure() {
+ for (RequestFutureListener listener: listeners)
+ listener.onFailure(exception);
+ }
+
+ /**
+ * Add a listener which will be notified when the future completes
+ * @param listener
+ */
+ public void addListener(RequestFutureListener<T> listener) {
+ if (isDone) {
+ if (exception != null)
+ listener.onFailure(exception);
+ else
+ listener.onSuccess(value);
+ } else {
+ this.listeners.add(listener);
+ }
+ }
+
+ /**
+ * Convert from a request future of one type to another type
+ * @param adapter The adapter which does the conversion
+ * @param <S> The type of the future adapted to
+ * @return The new future
+ */
+ public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
+ final RequestFuture<S> adapted = new RequestFuture<S>();
+ addListener(new RequestFutureListener<T>() {
+ @Override
+ public void onSuccess(T value) {
+ adapter.onSuccess(value, adapted);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ adapter.onFailure(e, adapted);
+ }
+ });
+ return adapted;
+ }
+
+ public static <T> RequestFuture<T> failure(RuntimeException e) {
+ RequestFuture<T> future = new RequestFuture<T>();
+ future.raise(e);
+ return future;
+ }
+
+ public static RequestFuture<Void> voidSuccess() {
+ RequestFuture<Void> future = new RequestFuture<Void>();
+ future.complete(null);
+ return future;
+ }
+
+ public static <T> RequestFuture<T> coordinatorNotAvailable() {
+ return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+ }
+
+ public static <T> RequestFuture<T> leaderNotAvailable() {
+ return failure(Errors.LEADER_NOT_AVAILABLE.exception());
+ }
+
+ public static <T> RequestFuture<T> noBrokersAvailable() {
+ return failure(new NoAvailableBrokersException());
+ }
+
+ public static <T> RequestFuture<T> staleMetadata() {
+ return failure(new StaleMetadataException());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
new file mode 100644
index 0000000..b34c2da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureAdapter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Adapt from a request future of one type to another.
+ *
+ * @param <F> Type to adapt from
+ * @param <T> Type to adapt to
+ */
+public abstract class RequestFutureAdapter<F, T> {
+
+ public abstract void onSuccess(F value, RequestFuture<T> future);
+
+ public void onFailure(RuntimeException e, RequestFuture<T> future) {
+ future.raise(e);
+ }
+}