You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:26 UTC
[09/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/ConsumerNetworkClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/ConsumerNetworkClient.java
deleted file mode 100644
index 5499a5d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/ConsumerNetworkClient.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import org.apache.kafka.copied.clients.ClientRequest;
-import org.apache.kafka.copied.clients.ClientResponse;
-import org.apache.kafka.copied.clients.KafkaClient;
-import org.apache.kafka.copied.clients.Metadata;
-import org.apache.kafka.copied.clients.RequestCompletionHandler;
-import org.apache.kafka.copied.clients.consumer.ConsumerWakeupException;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.requests.AbstractRequest;
-import org.apache.kafka.copied.common.requests.RequestHeader;
-import org.apache.kafka.copied.common.requests.RequestSend;
-import org.apache.kafka.copied.common.utils.Time;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Higher level consumer access to the network layer with basic support for futures and
- * task scheduling. NOT thread-safe!
- *
- * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
- * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
- * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
- * understand, but there are opportunities to provide timeout or retry capabilities in the future.
- * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
- */
-public class ConsumerNetworkClient implements Closeable {
- private final KafkaClient client;
- private final AtomicBoolean wakeup = new AtomicBoolean(false);
- private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
- private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
- private final Metadata metadata;
- private final Time time;
- private final long retryBackoffMs;
-
- public ConsumerNetworkClient(KafkaClient client,
- Metadata metadata,
- Time time,
- long retryBackoffMs) {
- this.client = client;
- this.metadata = metadata;
- this.time = time;
- this.retryBackoffMs = retryBackoffMs;
- }
-
- /**
- * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
- * should only be used for coarse synchronization.
- * @param task The task to be scheduled
- * @param at The time it should run
- */
- public void schedule(DelayedTask task, long at) {
- delayedTasks.add(task, at);
- }
-
- /**
- * Unschedule a task. This will remove all instances of the task from the task queue.
- * This is a no-op if the task is not scheduled.
- * @param task The task to be unscheduled.
- */
- public void unschedule(DelayedTask task) {
- delayedTasks.remove(task);
- }
-
- /**
- * Send a new request. Note that the request is not actually transmitted on the
- * network until one of the {@link #poll(long)} variants is invoked. At this
- * point the request will either be transmitted successfully or will fail.
- * Use the returned future to obtain the result of the send.
- * @param node The destination of the request
- * @param api The Kafka API call
- * @param request The request payload
- * @return A future which indicates the result of the send.
- */
- public RequestFuture<ClientResponse> send(Node node,
- ApiKeys api,
- AbstractRequest request) {
- long now = time.milliseconds();
- RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
- RequestHeader header = client.nextRequestHeader(api);
- RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
- put(node, new ClientRequest(now, true, send, future));
- return future;
- }
-
- private void put(Node node, ClientRequest request) {
- List<ClientRequest> nodeUnsent = unsent.get(node);
- if (nodeUnsent == null) {
- nodeUnsent = new ArrayList<ClientRequest>();
- unsent.put(node, nodeUnsent);
- }
- nodeUnsent.add(request);
- }
-
- public Node leastLoadedNode() {
- return client.leastLoadedNode(time.milliseconds());
- }
-
- /**
- * Block until the metadata has been refreshed.
- */
- public void awaitMetadataUpdate() {
- int version = this.metadata.requestUpdate();
- do {
- poll(Long.MAX_VALUE);
- } while (this.metadata.version() == version);
- }
-
- /**
- * Wakeup an active poll. This will cause the polling thread to throw an exception either
- * on the current poll if one is active, or the next poll.
- */
- public void wakeup() {
- this.wakeup.set(true);
- this.client.wakeup();
- }
-
- /**
- * Block indefinitely until the given request future has finished.
- * @param future The request future to await.
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
- */
- public void poll(RequestFuture<?> future) {
- while (!future.isDone())
- poll(Long.MAX_VALUE);
- }
-
- /**
- * Block until the provided request future request has finished or the timeout has expired.
- * @param future The request future to wait for
- * @param timeout The maximum duration (in ms) to wait for the request
- * @return true if the future is done, false otherwise
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
- */
- public boolean poll(RequestFuture<?> future, long timeout) {
- long now = time.milliseconds();
- long deadline = now + timeout;
- while (!future.isDone() && now < deadline) {
- poll(deadline - now, now);
- now = time.milliseconds();
- }
- return future.isDone();
- }
-
- /**
- * Poll for any network IO. All send requests will either be transmitted on the network
- * or failed when this call completes.
- * @param timeout The maximum time to wait for an IO event.
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
- */
- public void poll(long timeout) {
- poll(timeout, time.milliseconds());
- }
-
- private void poll(long timeout, long now) {
- // send all the requests we can send now
- pollUnsentRequests(now);
-
- // ensure we don't poll any longer than the deadline for
- // the next scheduled task
- timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
- clientPoll(timeout, now);
-
- // execute scheduled tasks
- now = time.milliseconds();
- delayedTasks.poll(now);
-
- // try again to send requests since buffer space may have been
- // cleared or a connect finished in the poll
- pollUnsentRequests(now);
-
- // fail all requests that couldn't be sent
- clearUnsentRequests(now);
-
- }
-
- /**
- * Block until all pending requests from the given node have finished.
- * @param node The node to await requests from
- */
- public void awaitPendingRequests(Node node) {
- while (pendingRequestCount(node) > 0)
- poll(retryBackoffMs);
- }
-
- /**
- * Get the count of pending requests to the given node. This includes both request that
- * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
- * @param node The node in question
- * @return The number of pending requests
- */
- public int pendingRequestCount(Node node) {
- List<ClientRequest> pending = unsent.get(node);
- int unsentCount = pending == null ? 0 : pending.size();
- return unsentCount + client.inFlightRequestCount(node.idString());
- }
-
- /**
- * Get the total count of pending requests from all nodes. This includes both requests that
- * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
- * @return The total count of pending requests
- */
- public int pendingRequestCount() {
- int total = 0;
- for (List<ClientRequest> requests: unsent.values())
- total += requests.size();
- return total + client.inFlightRequestCount();
- }
-
- private void pollUnsentRequests(long now) {
- while (trySend(now))
- clientPoll(0, now);
- }
-
- private void clearUnsentRequests(long now) {
- // clear all unsent requests and fail their corresponding futures
- for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
- Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
- while (iterator.hasNext()) {
- ClientRequest request = iterator.next();
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.raise(SendFailedException.INSTANCE);
- iterator.remove();
- }
- }
- unsent.clear();
- }
-
- private boolean trySend(long now) {
- // send any requests that can be sent now
- boolean requestsSent = false;
- for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
- Node node = requestEntry.getKey();
- Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
- while (iterator.hasNext()) {
- ClientRequest request = iterator.next();
- if (client.ready(node, now)) {
- client.send(request);
- iterator.remove();
- requestsSent = true;
- } else if (client.connectionFailed(node)) {
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.onComplete(new ClientResponse(request, now, true, null));
- iterator.remove();
- }
- }
- }
- return requestsSent;
- }
-
- private void clientPoll(long timeout, long now) {
- client.poll(timeout, now);
- if (wakeup.get()) {
- clearUnsentRequests(now);
- wakeup.set(false);
- throw new ConsumerWakeupException();
- }
- }
-
- @Override
- public void close() throws IOException {
- client.close();
- }
-
- public static class RequestFutureCompletionHandler
- extends RequestFuture<ClientResponse>
- implements RequestCompletionHandler {
-
- @Override
- public void onComplete(ClientResponse response) {
- complete(response);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Coordinator.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Coordinator.java
deleted file mode 100644
index b28f6bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Coordinator.java
+++ /dev/null
@@ -1,791 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import org.apache.kafka.copied.clients.ClientResponse;
-import org.apache.kafka.copied.clients.consumer.CommitType;
-import org.apache.kafka.copied.clients.consumer.ConsumerCommitCallback;
-import org.apache.kafka.copied.common.KafkaException;
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.errors.DisconnectException;
-import org.apache.kafka.copied.common.metrics.Measurable;
-import org.apache.kafka.copied.common.metrics.MetricConfig;
-import org.apache.kafka.copied.common.metrics.Metrics;
-import org.apache.kafka.copied.common.metrics.Sensor;
-import org.apache.kafka.copied.common.metrics.stats.Avg;
-import org.apache.kafka.copied.common.metrics.stats.Count;
-import org.apache.kafka.copied.common.metrics.stats.Max;
-import org.apache.kafka.copied.common.metrics.stats.Rate;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.requests.ConsumerMetadataRequest;
-import org.apache.kafka.copied.common.requests.ConsumerMetadataResponse;
-import org.apache.kafka.copied.common.requests.HeartbeatRequest;
-import org.apache.kafka.copied.common.requests.HeartbeatResponse;
-import org.apache.kafka.copied.common.requests.JoinGroupRequest;
-import org.apache.kafka.copied.common.requests.JoinGroupResponse;
-import org.apache.kafka.copied.common.requests.OffsetCommitRequest;
-import org.apache.kafka.copied.common.requests.OffsetCommitResponse;
-import org.apache.kafka.copied.common.requests.OffsetFetchRequest;
-import org.apache.kafka.copied.common.requests.OffsetFetchResponse;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This class manages the coordination process with the consumer coordinator.
- */
-public final class Coordinator {
-
- private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
-
- private final ConsumerNetworkClient client;
- private final Time time;
- private final String groupId;
- private final Heartbeat heartbeat;
- private final HeartbeatTask heartbeatTask;
- private final int sessionTimeoutMs;
- private final String assignmentStrategy;
- private final SubscriptionState subscriptions;
- private final CoordinatorMetrics sensors;
- private final long requestTimeoutMs;
- private final long retryBackoffMs;
- private final RebalanceCallback rebalanceCallback;
- private Node consumerCoordinator;
- private String consumerId;
- private int generation;
-
-
- /**
- * Initialize the coordination manager.
- */
- public Coordinator(ConsumerNetworkClient client,
- String groupId,
- int sessionTimeoutMs,
- String assignmentStrategy,
- SubscriptionState subscriptions,
- Metrics metrics,
- String metricGrpPrefix,
- Map<String, String> metricTags,
- Time time,
- long requestTimeoutMs,
- long retryBackoffMs,
- RebalanceCallback rebalanceCallback) {
-
- this.client = client;
- this.time = time;
- this.generation = -1;
- this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
- this.groupId = groupId;
- this.consumerCoordinator = null;
- this.subscriptions = subscriptions;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.assignmentStrategy = assignmentStrategy;
- this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
- this.heartbeatTask = new HeartbeatTask();
- this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
- this.requestTimeoutMs = requestTimeoutMs;
- this.retryBackoffMs = retryBackoffMs;
- this.rebalanceCallback = rebalanceCallback;
- }
-
- /**
- * Refresh the committed offsets for provided partitions.
- */
- public void refreshCommittedOffsetsIfNeeded() {
- if (subscriptions.refreshCommitsNeeded()) {
- Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
- TopicPartition tp = entry.getKey();
- this.subscriptions.committed(tp, entry.getValue());
- }
- this.subscriptions.commitsRefreshed();
- }
- }
-
- /**
- * Fetch the current committed offsets from the coordinator for a set of partitions.
- * @param partitions The partitions to fetch offsets for
- * @return A map from partition to the committed offset
- */
- public Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
- while (true) {
- ensureCoordinatorKnown();
- ensurePartitionAssignment();
-
- // contact coordinator to fetch committed offsets
- RequestFuture<Map<TopicPartition, Long>> future = sendOffsetFetchRequest(partitions);
- client.poll(future);
-
- if (future.succeeded())
- return future.value();
-
- if (!future.isRetriable())
- throw future.exception();
-
- Utils.sleep(retryBackoffMs);
- }
- }
-
- /**
- * Ensure that we have a valid partition assignment from the coordinator.
- */
- public void ensurePartitionAssignment() {
- if (!subscriptions.partitionAssignmentNeeded())
- return;
-
- // execute the user's callback before rebalance
- log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
- rebalanceCallback.onPartitionsRevoked(revoked);
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition revocation: ", e);
- }
-
- reassignPartitions();
-
- // execute the user's callback after rebalance
- log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
- try {
- Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
- rebalanceCallback.onPartitionsAssigned(assigned);
- } catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
- + " failed on partition assignment: ", e);
- }
- }
-
- private void reassignPartitions() {
- while (subscriptions.partitionAssignmentNeeded()) {
- ensureCoordinatorKnown();
-
- // ensure that there are no pending requests to the coordinator. This is important
- // in particular to avoid resending a pending JoinGroup request.
- if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
- client.awaitPendingRequests(this.consumerCoordinator);
- continue;
- }
-
- RequestFuture<Void> future = sendJoinGroupRequest();
- client.poll(future);
-
- if (future.failed()) {
- if (!future.isRetriable())
- throw future.exception();
- Utils.sleep(retryBackoffMs);
- }
- }
- }
-
- /**
- * Block until the coordinator for this group is known.
- */
- public void ensureCoordinatorKnown() {
- while (coordinatorUnknown()) {
- RequestFuture<Void> future = sendConsumerMetadataRequest();
- client.poll(future, requestTimeoutMs);
-
- if (future.failed())
- client.awaitMetadataUpdate();
- }
- }
-
- /**
- * Commit offsets. This call blocks (regardless of commitType) until the coordinator
- * can receive the commit request. Once the request has been made, however, only the
- * synchronous commits will wait for a successful response from the coordinator.
- * @param offsets Offsets to commit.
- * @param commitType Commit policy
- * @param callback Callback to be executed when the commit request finishes
- */
- public void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
- if (commitType == CommitType.ASYNC)
- commitOffsetsAsync(offsets, callback);
- else
- commitOffsetsSync(offsets, callback);
- }
-
- private class HeartbeatTask implements DelayedTask {
-
- public void reset() {
- // start or restart the heartbeat task to be executed at the next chance
- long now = time.milliseconds();
- heartbeat.resetSessionTimeout(now);
- client.unschedule(this);
- client.schedule(this, now);
- }
-
- @Override
- public void run(final long now) {
- if (!subscriptions.partitionsAutoAssigned() ||
- subscriptions.partitionAssignmentNeeded() ||
- coordinatorUnknown())
- // no need to send if we're not using auto-assignment or if we are
- // awaiting a rebalance
- return;
-
- if (heartbeat.sessionTimeoutExpired(now)) {
- // we haven't received a successful heartbeat in one session interval
- // so mark the coordinator dead
- coordinatorDead();
- return;
- }
-
- if (!heartbeat.shouldHeartbeat(now)) {
- // we don't need to heartbeat now, so reschedule for when we do
- client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
- } else {
- heartbeat.sentHeartbeat(now);
- RequestFuture<Void> future = sendHeartbeatRequest();
- future.addListener(new RequestFutureListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- long now = time.milliseconds();
- heartbeat.receiveHeartbeat(now);
- long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
- client.schedule(HeartbeatTask.this, nextHeartbeatTime);
- }
-
- @Override
- public void onFailure(RuntimeException e) {
- client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
- }
- });
- }
- }
- }
-
- /**
- * Send a request to get a new partition assignment. This is a non-blocking call which sends
- * a JoinGroup request to the coordinator (if it is available). The returned future must
- * be polled to see if the request completed successfully.
- * @return A request future whose completion indicates the result of the JoinGroup request.
- */
- private RequestFuture<Void> sendJoinGroupRequest() {
- if (coordinatorUnknown())
- return RequestFuture.coordinatorNotAvailable();
-
- // send a join group request to the coordinator
- List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
- log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
-
- JoinGroupRequest request = new JoinGroupRequest(groupId,
- this.sessionTimeoutMs,
- subscribedTopics,
- this.consumerId,
- this.assignmentStrategy);
-
- // create the request for the coordinator
- log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
- return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
- .compose(new JoinGroupResponseHandler());
- }
-
- private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
-
- @Override
- public JoinGroupResponse parse(ClientResponse response) {
- return new JoinGroupResponse(response.responseBody());
- }
-
- @Override
- public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
- // process the response
- short errorCode = joinResponse.errorCode();
-
- if (errorCode == Errors.NONE.code()) {
- Coordinator.this.consumerId = joinResponse.consumerId();
- Coordinator.this.generation = joinResponse.generationId();
-
- // set the flag to refresh last committed offsets
- subscriptions.needRefreshCommits();
-
- log.debug("Joined group: {}", joinResponse.toStruct());
-
- // record re-assignment time
- sensors.partitionReassignments.record(response.requestLatencyMs());
-
- // update partition assignment
- subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
- heartbeatTask.reset();
- future.complete(null);
- } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
- // reset the consumer id and retry immediately
- Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
- log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
- groupId);
- future.raise(Errors.UNKNOWN_CONSUMER_ID);
- } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- // re-discover the coordinator and retry with backoff
- coordinatorDead();
- log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
- groupId);
- future.raise(Errors.forCode(errorCode));
- } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
- || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
- || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
- // log the error and re-throw the exception
- Errors error = Errors.forCode(errorCode);
- log.error("Attempt to join group {} failed due to: {}",
- groupId, error.exception().getMessage());
- future.raise(error);
- } else {
- // unexpected error, throw the exception
- future.raise(new KafkaException("Unexpected error in join group response: "
- + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
- }
- }
- }
-
- private void commitOffsetsAsync(final Map<TopicPartition, Long> offsets, final ConsumerCommitCallback callback) {
- this.subscriptions.needRefreshCommits();
- RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
- if (callback != null) {
- future.addListener(new RequestFutureListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- callback.onComplete(offsets, null);
- }
-
- @Override
- public void onFailure(RuntimeException e) {
- callback.onComplete(offsets, e);
- }
- });
- }
- }
-
- private void commitOffsetsSync(Map<TopicPartition, Long> offsets, ConsumerCommitCallback callback) {
- while (true) {
- ensureCoordinatorKnown();
- ensurePartitionAssignment();
-
- RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
- client.poll(future);
-
- if (future.succeeded()) {
- if (callback != null)
- callback.onComplete(offsets, null);
- return;
- }
-
- if (!future.isRetriable()) {
- if (callback == null)
- throw future.exception();
- else
- callback.onComplete(offsets, future.exception());
- return;
- }
-
- Utils.sleep(retryBackoffMs);
- }
- }
-
- /**
- * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
- * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
- * asynchronous case.
- *
- * @param offsets The list of offsets per partition that should be committed.
- * @return A request future whose value indicates whether the commit was successful or not
- */
- private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Long> offsets) {
- if (coordinatorUnknown())
- return RequestFuture.coordinatorNotAvailable();
-
- if (offsets.isEmpty())
- return RequestFuture.voidSuccess();
-
- // create the offset commit request
- Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
- offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
- offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
- OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
- this.generation,
- this.consumerId,
- OffsetCommitRequest.DEFAULT_RETENTION_TIME,
- offsetData);
-
- return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
- .compose(new OffsetCommitResponseHandler(offsets));
- }
-
-
- private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
-
- private final Map<TopicPartition, Long> offsets;
-
- public OffsetCommitResponseHandler(Map<TopicPartition, Long> offsets) {
- this.offsets = offsets;
- }
-
- @Override
- public OffsetCommitResponse parse(ClientResponse response) {
- return new OffsetCommitResponse(response.responseBody());
- }
-
- @Override
- public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
- sensors.commitLatency.record(response.requestLatencyMs());
- for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- long offset = this.offsets.get(tp);
- short errorCode = entry.getValue();
- if (errorCode == Errors.NONE.code()) {
- log.debug("Committed offset {} for partition {}", offset, tp);
- subscriptions.committed(tp, offset);
- } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- coordinatorDead();
- future.raise(Errors.forCode(errorCode));
- return;
- } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
- || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
- // do not need to throw the exception but just log the error
- log.error("Error committing partition {} at offset {}: {}",
- tp,
- offset,
- Errors.forCode(errorCode).exception().getMessage());
- } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
- || errorCode == Errors.ILLEGAL_GENERATION.code()) {
- // need to re-join group
- subscriptions.needReassignment();
- future.raise(Errors.forCode(errorCode));
- return;
- } else {
- // do not need to throw the exception but just log the error
- future.raise(Errors.forCode(errorCode));
- log.error("Error committing partition {} at offset {}: {}",
- tp,
- offset,
- Errors.forCode(errorCode).exception().getMessage());
- }
- }
-
- future.complete(null);
- }
- }
-
- /**
- * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
- * returned future can be polled to get the actual offsets returned from the broker.
- *
- * @param partitions The set of partitions to get offsets for.
- * @return A request future containing the committed offsets.
- */
- private RequestFuture<Map<TopicPartition, Long>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
- if (coordinatorUnknown())
- return RequestFuture.coordinatorNotAvailable();
-
- log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", "));
- // construct the request
- OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
-
- // send the request with a callback
- return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
- .compose(new OffsetFetchResponseHandler());
- }
-
- private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, Long>> {
-
- @Override
- public OffsetFetchResponse parse(ClientResponse response) {
- return new OffsetFetchResponse(response.responseBody());
- }
-
- @Override
- public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, Long>> future) {
- Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
- for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- OffsetFetchResponse.PartitionData data = entry.getValue();
- if (data.hasError()) {
- log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
- .exception()
- .getMessage());
- if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
- // just retry
- future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
- } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- // re-discover the coordinator and retry
- coordinatorDead();
- future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
- } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
- || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
- // need to re-join group
- subscriptions.needReassignment();
- future.raise(Errors.forCode(data.errorCode));
- } else {
- future.raise(new KafkaException("Unexpected error in fetch offset response: "
- + Errors.forCode(data.errorCode).exception().getMessage()));
- }
- return;
- } else if (data.offset >= 0) {
- // record the position with the offset (-1 indicates no committed offset to fetch)
- offsets.put(tp, data.offset);
- } else {
- log.debug("No committed offset for partition " + tp);
- }
- }
-
- future.complete(offsets);
- }
- }
-
- /**
- * Send a heartbeat request now (visible only for testing).
- */
- public RequestFuture<Void> sendHeartbeatRequest() {
- HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
- return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
- .compose(new HeartbeatCompletionHandler());
- }
-
- public boolean coordinatorUnknown() {
- return this.consumerCoordinator == null;
- }
-
- /**
- * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
- * one of the brokers. The returned future should be polled to get the result of the request.
- * @return A request future which indicates the completion of the metadata request
- */
- private RequestFuture<Void> sendConsumerMetadataRequest() {
- // initiate the consumer metadata request
- // find a node to ask about the coordinator
- Node node = this.client.leastLoadedNode();
- if (node == null) {
- // TODO: If there are no brokers left, perhaps we should use the bootstrap set
- // from configuration?
- return RequestFuture.noBrokersAvailable();
- } else {
- // create a consumer metadata request
- log.debug("Issuing consumer metadata request to broker {}", node.id());
- ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
- return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
- .compose(new RequestFutureAdapter<ClientResponse, Void>() {
- @Override
- public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
- handleConsumerMetadataResponse(response, future);
- }
- });
- }
- }
-
- private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
- log.debug("Consumer metadata response {}", resp);
-
- // parse the response to get the coordinator info if it is not disconnected,
- // otherwise we need to request metadata update
- if (resp.wasDisconnected()) {
- future.raise(new DisconnectException());
- } else if (!coordinatorUnknown()) {
- // We already found the coordinator, so ignore the request
- future.complete(null);
- } else {
- ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
- // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
- // for the coordinator in the underlying network client layer
- // TODO: this needs to be better handled in KAFKA-1935
- if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
- this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
- consumerMetadataResponse.node().host(),
- consumerMetadataResponse.node().port());
- heartbeatTask.reset();
- future.complete(null);
- } else {
- future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
- }
- }
- }
-
- /**
- * Mark the current coordinator as dead.
- */
- private void coordinatorDead() {
- if (this.consumerCoordinator != null) {
- log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
- this.consumerCoordinator = null;
- }
- }
-
- private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
- @Override
- public HeartbeatResponse parse(ClientResponse response) {
- return new HeartbeatResponse(response.responseBody());
- }
-
- @Override
- public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
- sensors.heartbeatLatency.record(response.requestLatencyMs());
- short error = heartbeatResponse.errorCode();
- if (error == Errors.NONE.code()) {
- log.debug("Received successful heartbeat response.");
- future.complete(null);
- } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
- || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
- log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
- coordinatorDead();
- future.raise(Errors.forCode(error));
- } else if (error == Errors.ILLEGAL_GENERATION.code()) {
- log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
- subscriptions.needReassignment();
- future.raise(Errors.ILLEGAL_GENERATION);
- } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
- log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
- consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
- subscriptions.needReassignment();
- future.raise(Errors.UNKNOWN_CONSUMER_ID);
- } else {
- future.raise(new KafkaException("Unexpected error in heartbeat response: "
- + Errors.forCode(error).exception().getMessage()));
- }
- }
- }
-
- private abstract class CoordinatorResponseHandler<R, T>
- extends RequestFutureAdapter<ClientResponse, T> {
- protected ClientResponse response;
-
- public abstract R parse(ClientResponse response);
-
- public abstract void handle(R response, RequestFuture<T> future);
-
- @Override
- public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
- this.response = clientResponse;
-
- if (clientResponse.wasDisconnected()) {
- int correlation = response.request().request().header().correlationId();
- log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
- response.request(),
- correlation,
- response.request().request().destination());
-
- // mark the coordinator as dead
- coordinatorDead();
- future.raise(new DisconnectException());
- return;
- }
-
- R response = parse(clientResponse);
- handle(response, future);
- }
-
- @Override
- public void onFailure(RuntimeException e, RequestFuture<T> future) {
- if (e instanceof DisconnectException) {
- log.debug("Coordinator request failed", e);
- coordinatorDead();
- }
- future.raise(e);
- }
- }
-
- public interface RebalanceCallback {
- void onPartitionsAssigned(Collection<TopicPartition> partitions);
- void onPartitionsRevoked(Collection<TopicPartition> partitions);
- }
-
- private class CoordinatorMetrics {
- public final Metrics metrics;
- public final String metricGrpName;
-
- public final Sensor commitLatency;
- public final Sensor heartbeatLatency;
- public final Sensor partitionReassignments;
-
- public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
- this.metrics = metrics;
- this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
-
- this.commitLatency = metrics.sensor("commit-latency");
- this.commitLatency.add(new MetricName("commit-latency-avg",
- this.metricGrpName,
- "The average time taken for a commit request",
- tags), new Avg());
- this.commitLatency.add(new MetricName("commit-latency-max",
- this.metricGrpName,
- "The max time taken for a commit request",
- tags), new Max());
- this.commitLatency.add(new MetricName("commit-rate",
- this.metricGrpName,
- "The number of commit calls per second",
- tags), new Rate(new Count()));
-
- this.heartbeatLatency = metrics.sensor("heartbeat-latency");
- this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
- this.metricGrpName,
- "The max time taken to receive a response to a hearbeat request",
- tags), new Max());
- this.heartbeatLatency.add(new MetricName("heartbeat-rate",
- this.metricGrpName,
- "The average number of heartbeats per second",
- tags), new Rate(new Count()));
-
- this.partitionReassignments = metrics.sensor("reassignment-latency");
- this.partitionReassignments.add(new MetricName("reassignment-time-avg",
- this.metricGrpName,
- "The average time taken for a partition reassignment",
- tags), new Avg());
- this.partitionReassignments.add(new MetricName("reassignment-time-max",
- this.metricGrpName,
- "The max time taken for a partition reassignment",
- tags), new Avg());
- this.partitionReassignments.add(new MetricName("reassignment-rate",
- this.metricGrpName,
- "The number of partition reassignments per second",
- tags), new Rate(new Count()));
-
- Measurable numParts =
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- };
- metrics.addMetric(new MetricName("assigned-partitions",
- this.metricGrpName,
- "The number of partitions currently assigned to this consumer",
- tags),
- numParts);
-
- Measurable lastHeartbeat =
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
- }
- };
- metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
- this.metricGrpName,
- "The number of seconds since the last controller heartbeat",
- tags),
- lastHeartbeat);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTask.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTask.java
deleted file mode 100644
index 6cc07f7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTask.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.clients.consumer.internals;
-
-
-public interface DelayedTask {
-
- /**
- * Execute the task.
- * @param now current time in milliseconds
- */
- void run(long now);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTaskQueue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTaskQueue.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTaskQueue.java
deleted file mode 100644
index dd02559..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/DelayedTaskQueue.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-/**
- * Tracks a set of tasks to be executed after a delay.
- */
-public class DelayedTaskQueue {
-
- private PriorityQueue<Entry> tasks;
-
- public DelayedTaskQueue() {
- tasks = new PriorityQueue<Entry>();
- }
-
- /**
- * Schedule a task for execution in the future.
- *
- * @param task the task to execute
- * @param at the time at which to
- */
- public void add(DelayedTask task, long at) {
- tasks.add(new Entry(task, at));
- }
-
- /**
- * Remove a task from the queue if it is present
- * @param task the task to be removed
- * @returns true if a task was removed as a result of this call
- */
- public boolean remove(DelayedTask task) {
- boolean wasRemoved = false;
- Iterator<Entry> iterator = tasks.iterator();
- while (iterator.hasNext()) {
- Entry entry = iterator.next();
- if (entry.task.equals(task)) {
- iterator.remove();
- wasRemoved = true;
- }
- }
- return wasRemoved;
- }
-
- /**
- * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled.
- *
- * @return the remaining time in milliseconds
- */
- public long nextTimeout(long now) {
- if (tasks.isEmpty())
- return Long.MAX_VALUE;
- else
- return Math.max(tasks.peek().timeout - now, 0);
- }
-
- /**
- * Run any ready tasks.
- *
- * @param now the current time
- */
- public void poll(long now) {
- while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
- Entry entry = tasks.poll();
- entry.task.run(now);
- }
- }
-
- private static class Entry implements Comparable<Entry> {
- DelayedTask task;
- long timeout;
-
- public Entry(DelayedTask task, long timeout) {
- this.task = task;
- this.timeout = timeout;
- }
-
- @Override
- public int compareTo(Entry entry) {
- return Long.compare(timeout, entry.timeout);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Fetcher.java
deleted file mode 100644
index 4767f3f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Fetcher.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import org.apache.kafka.copied.clients.ClientResponse;
-import org.apache.kafka.copied.clients.Metadata;
-import org.apache.kafka.copied.clients.consumer.ConsumerRecord;
-import org.apache.kafka.copied.clients.consumer.NoOffsetForPartitionException;
-import org.apache.kafka.copied.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.copied.common.Cluster;
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.PartitionInfo;
-import org.apache.kafka.copied.common.TopicPartition;
-import org.apache.kafka.copied.common.errors.DisconnectException;
-import org.apache.kafka.copied.common.errors.InvalidMetadataException;
-import org.apache.kafka.copied.common.metrics.Metrics;
-import org.apache.kafka.copied.common.metrics.Sensor;
-import org.apache.kafka.copied.common.metrics.stats.Avg;
-import org.apache.kafka.copied.common.metrics.stats.Count;
-import org.apache.kafka.copied.common.metrics.stats.Max;
-import org.apache.kafka.copied.common.metrics.stats.Rate;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.Errors;
-import org.apache.kafka.copied.common.record.LogEntry;
-import org.apache.kafka.copied.common.record.MemoryRecords;
-import org.apache.kafka.copied.common.requests.FetchRequest;
-import org.apache.kafka.copied.common.requests.FetchResponse;
-import org.apache.kafka.copied.common.requests.ListOffsetRequest;
-import org.apache.kafka.copied.common.requests.ListOffsetResponse;
-import org.apache.kafka.copied.common.serialization.Deserializer;
-import org.apache.kafka.copied.common.utils.Time;
-import org.apache.kafka.copied.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * This class manage the fetching process with the brokers.
- */
-public class Fetcher<K, V> {
- private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
- private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-
- private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
-
- private final ConsumerNetworkClient client;
- private final Time time;
- private final int minBytes;
- private final int maxWaitMs;
- private final int fetchSize;
- private final long retryBackoffMs;
- private final boolean checkCrcs;
- private final Metadata metadata;
- private final FetchManagerMetrics sensors;
- private final SubscriptionState subscriptions;
- private final List<PartitionRecords<K, V>> records;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
-
- public Fetcher(ConsumerNetworkClient client,
- int minBytes,
- int maxWaitMs,
- int fetchSize,
- boolean checkCrcs,
- Deserializer<K> keyDeserializer,
- Deserializer<V> valueDeserializer,
- Metadata metadata,
- SubscriptionState subscriptions,
- Metrics metrics,
- String metricGrpPrefix,
- Map<String, String> metricTags,
- Time time,
- long retryBackoffMs) {
-
- this.time = time;
- this.client = client;
- this.metadata = metadata;
- this.subscriptions = subscriptions;
- this.minBytes = minBytes;
- this.maxWaitMs = maxWaitMs;
- this.fetchSize = fetchSize;
- this.checkCrcs = checkCrcs;
-
- this.keyDeserializer = keyDeserializer;
- this.valueDeserializer = valueDeserializer;
-
- this.records = new LinkedList<PartitionRecords<K, V>>();
-
- this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
- this.retryBackoffMs = retryBackoffMs;
- }
-
- /**
- * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
- *
- * @param cluster The current cluster metadata
- */
- public void initFetches(Cluster cluster) {
- for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
- final FetchRequest fetch = fetchEntry.getValue();
- client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
- .addListener(new RequestFutureListener<ClientResponse>() {
- @Override
- public void onSuccess(ClientResponse response) {
- handleFetchResponse(response, fetch);
- }
-
- @Override
- public void onFailure(RuntimeException e) {
- log.debug("Fetch failed", e);
- }
- });
- }
- }
-
- /**
- * Update the fetch positions for the provided partitions.
- * @param partitions
- */
- public void updateFetchPositions(Set<TopicPartition> partitions) {
- // reset the fetch position to the committed position
- for (TopicPartition tp : partitions) {
- // skip if we already have a fetch position
- if (subscriptions.fetched(tp) != null)
- continue;
-
- // TODO: If there are several offsets to reset, we could submit offset requests in parallel
- if (subscriptions.isOffsetResetNeeded(tp)) {
- resetOffset(tp);
- } else if (subscriptions.committed(tp) == null) {
- // there's no committed position, so we need to reset with the default strategy
- subscriptions.needOffsetReset(tp);
- resetOffset(tp);
- } else {
- log.debug("Resetting offset for partition {} to the committed offset {}",
- tp, subscriptions.committed(tp));
- subscriptions.seek(tp, subscriptions.committed(tp));
- }
- }
- }
-
- /**
- * Reset offsets for the given partition using the offset reset strategy.
- *
- * @param partition The given partition that needs reset offset
- * @throws NoOffsetForPartitionException If no offset reset strategy is defined
- */
- private void resetOffset(TopicPartition partition) {
- OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
- final long timestamp;
- if (strategy == OffsetResetStrategy.EARLIEST)
- timestamp = EARLIEST_OFFSET_TIMESTAMP;
- else if (strategy == OffsetResetStrategy.LATEST)
- timestamp = LATEST_OFFSET_TIMESTAMP;
- else
- throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
-
- log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
- long offset = listOffset(partition, timestamp);
- this.subscriptions.seek(partition, offset);
- }
-
- /**
- * Fetch a single offset before the given timestamp for the partition.
- *
- * @param partition The partition that needs fetching offset.
- * @param timestamp The timestamp for fetching offset.
- * @return The offset of the message that is published before the given timestamp
- */
- private long listOffset(TopicPartition partition, long timestamp) {
- while (true) {
- RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
- client.poll(future);
-
- if (future.succeeded())
- return future.value();
-
- if (!future.isRetriable())
- throw future.exception();
-
- if (future.exception() instanceof InvalidMetadataException)
- client.awaitMetadataUpdate();
- else
- Utils.sleep(retryBackoffMs);
- }
- }
-
- /**
- * Return the fetched records, empty the record buffer and update the consumed position.
- *
- * @return The fetched records per partition
- */
- public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
- if (this.subscriptions.partitionAssignmentNeeded()) {
- return Collections.emptyMap();
- } else {
- Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
- for (PartitionRecords<K, V> part : this.records) {
- Long consumed = subscriptions.consumed(part.partition);
- if (this.subscriptions.assignedPartitions().contains(part.partition)
- && (consumed == null || part.fetchOffset == consumed)) {
- List<ConsumerRecord<K, V>> records = drained.get(part.partition);
- if (records == null) {
- records = part.records;
- drained.put(part.partition, records);
- } else {
- records.addAll(part.records);
- }
- subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
- } else {
- // these records aren't next in line based on the last consumed position, ignore them
- // they must be from an obsolete request
- log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
- }
- }
- this.records.clear();
- return drained;
- }
- }
-
- /**
- * Fetch a single offset before the given timestamp for the partition.
- *
- * @param topicPartition The partition that needs fetching offset.
- * @param timestamp The timestamp for fetching offset.
- * @return A response which can be polled to obtain the corresponding offset.
- */
- private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
- Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
- partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
- PartitionInfo info = metadata.fetch().partition(topicPartition);
- if (info == null) {
- metadata.add(topicPartition.topic());
- log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
- return RequestFuture.staleMetadata();
- } else if (info.leader() == null) {
- log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
- return RequestFuture.leaderNotAvailable();
- } else {
- Node node = info.leader();
- ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
- return client.send(node, ApiKeys.LIST_OFFSETS, request)
- .compose(new RequestFutureAdapter<ClientResponse, Long>() {
- @Override
- public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
- handleListOffsetResponse(topicPartition, response, future);
- }
- });
- }
- }
-
- /**
- * Callback for the response of the list offset call above.
- * @param topicPartition The partition that was fetched
- * @param clientResponse The response from the server.
- */
- private void handleListOffsetResponse(TopicPartition topicPartition,
- ClientResponse clientResponse,
- RequestFuture<Long> future) {
- if (clientResponse.wasDisconnected()) {
- future.raise(new DisconnectException());
- } else {
- ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
- short errorCode = lor.responseData().get(topicPartition).errorCode;
- if (errorCode == Errors.NONE.code()) {
- List<Long> offsets = lor.responseData().get(topicPartition).offsets;
- if (offsets.size() != 1)
- throw new IllegalStateException("This should not happen.");
- long offset = offsets.get(0);
- log.debug("Fetched offset {} for partition {}", offset, topicPartition);
-
- future.complete(offset);
- } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
- log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
- topicPartition);
- future.raise(Errors.forCode(errorCode));
- } else {
- log.error("Attempt to fetch offsets for partition {} failed due to: {}",
- topicPartition, Errors.forCode(errorCode).exception().getMessage());
- future.raise(new StaleMetadataException());
- }
- }
- }
-
- /**
- * Create fetch requests for all nodes for which we have assigned partitions
- * that have no existing requests in flight.
- */
- private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
- // create the fetch info
- Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
- for (TopicPartition partition : subscriptions.assignedPartitions()) {
- Node node = cluster.leaderFor(partition);
- if (node == null) {
- metadata.requestUpdate();
- } else if (this.client.pendingRequestCount(node) == 0) {
- // if there is a leader and no in-flight requests, issue a new fetch
- Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
- if (fetch == null) {
- fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
- fetchable.put(node, fetch);
- }
- long offset = this.subscriptions.fetched(partition);
- fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
- }
- }
-
- // create the fetches
- Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
- for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
- Node node = entry.getKey();
- FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
- requests.put(node, fetch);
- }
- return requests;
- }
-
- /**
- * The callback for fetch completion
- */
- private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
- if (resp.wasDisconnected()) {
- int correlation = resp.request().request().header().correlationId();
- log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
- resp.request(), correlation, resp.request().request().destination());
- } else {
- int totalBytes = 0;
- int totalCount = 0;
- FetchResponse response = new FetchResponse(resp.responseBody());
- for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- FetchResponse.PartitionData partition = entry.getValue();
- if (!subscriptions.assignedPartitions().contains(tp)) {
- log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
- } else if (partition.errorCode == Errors.NONE.code()) {
- int bytes = 0;
- ByteBuffer buffer = partition.recordSet;
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
- long fetchOffset = request.fetchData().get(tp).offset;
- List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
- for (LogEntry logEntry : records) {
- parsed.add(parseRecord(tp, logEntry));
- bytes += logEntry.size();
- }
- if (parsed.size() > 0) {
- ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
- this.subscriptions.fetched(tp, record.offset() + 1);
- this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
- this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
- }
- this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
- totalBytes += bytes;
- totalCount += parsed.size();
- } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
- this.metadata.requestUpdate();
- } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
- // TODO: this could be optimized by grouping all out-of-range partitions
- log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
- subscriptions.needOffsetReset(tp);
- } else if (partition.errorCode == Errors.UNKNOWN.code()) {
- log.warn("Unknown error fetching data for topic-partition {}", tp);
- } else {
- throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
- }
- }
- this.sensors.bytesFetched.record(totalBytes);
- this.sensors.recordsFetched.record(totalCount);
- }
- this.sensors.fetchLatency.record(resp.requestLatencyMs());
- }
-
- /**
- * Parse the record entry, deserializing the key / value fields if necessary
- */
- private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
- if (this.checkCrcs)
- logEntry.record().ensureValid();
-
- long offset = logEntry.offset();
- ByteBuffer keyBytes = logEntry.record().key();
- K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
- ByteBuffer valueBytes = logEntry.record().value();
- V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
-
- return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
- }
-
- private static class PartitionRecords<K, V> {
- public long fetchOffset;
- public TopicPartition partition;
- public List<ConsumerRecord<K, V>> records;
-
- public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
- this.fetchOffset = fetchOffset;
- this.partition = partition;
- this.records = records;
- }
- }
-
- private class FetchManagerMetrics {
- public final Metrics metrics;
- public final String metricGrpName;
-
- public final Sensor bytesFetched;
- public final Sensor recordsFetched;
- public final Sensor fetchLatency;
- public final Sensor recordsFetchLag;
-
-
- public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
- this.metrics = metrics;
- this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
-
- this.bytesFetched = metrics.sensor("bytes-fetched");
- this.bytesFetched.add(new MetricName("fetch-size-avg",
- this.metricGrpName,
- "The average number of bytes fetched per request",
- tags), new Avg());
- this.bytesFetched.add(new MetricName("fetch-size-max",
- this.metricGrpName,
- "The maximum number of bytes fetched per request",
- tags), new Max());
- this.bytesFetched.add(new MetricName("bytes-consumed-rate",
- this.metricGrpName,
- "The average number of bytes consumed per second",
- tags), new Rate());
-
- this.recordsFetched = metrics.sensor("records-fetched");
- this.recordsFetched.add(new MetricName("records-per-request-avg",
- this.metricGrpName,
- "The average number of records in each request",
- tags), new Avg());
- this.recordsFetched.add(new MetricName("records-consumed-rate",
- this.metricGrpName,
- "The average number of records consumed per second",
- tags), new Rate());
-
- this.fetchLatency = metrics.sensor("fetch-latency");
- this.fetchLatency.add(new MetricName("fetch-latency-avg",
- this.metricGrpName,
- "The average time taken for a fetch request.",
- tags), new Avg());
- this.fetchLatency.add(new MetricName("fetch-latency-max",
- this.metricGrpName,
- "The max time taken for any fetch request.",
- tags), new Max());
- this.fetchLatency.add(new MetricName("fetch-rate",
- this.metricGrpName,
- "The number of fetch requests per second.",
- tags), new Rate(new Count()));
-
- this.recordsFetchLag = metrics.sensor("records-lag");
- this.recordsFetchLag.add(new MetricName("records-lag-max",
- this.metricGrpName,
- "The maximum lag in terms of number of records for any partition in this window",
- tags), new Max());
- }
-
- public void recordTopicFetchMetrics(String topic, int bytes, int records) {
- // record bytes fetched
- String name = "topic." + topic + ".bytes-fetched";
- Sensor bytesFetched = this.metrics.getSensor(name);
- if (bytesFetched == null)
- bytesFetched = this.metrics.sensor(name);
- bytesFetched.record(bytes);
-
- // record records fetched
- name = "topic." + topic + ".records-fetched";
- Sensor recordsFetched = this.metrics.getSensor(name);
- if (recordsFetched == null)
- recordsFetched = this.metrics.sensor(name);
- recordsFetched.record(records);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Heartbeat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Heartbeat.java
deleted file mode 100644
index c5282a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/Heartbeat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer.internals;
-
-/**
- * A helper class for managing the heartbeat to the coordinator
- */
-public final class Heartbeat {
-
- /* The number of heartbeats to attempt to complete per session timeout interval.
- * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
- * once per second.
- */
- public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
-
- private final long timeout;
- private long lastHeartbeatSend;
- private long lastHeartbeatReceive;
- private long lastSessionReset;
-
- public Heartbeat(long timeout, long now) {
- this.timeout = timeout;
- this.lastSessionReset = now;
- }
-
- public void sentHeartbeat(long now) {
- this.lastHeartbeatSend = now;
- }
-
- public void receiveHeartbeat(long now) {
- this.lastHeartbeatReceive = now;
- }
-
- public boolean shouldHeartbeat(long now) {
- return timeToNextHeartbeat(now) == 0;
- }
-
- public long lastHeartbeatSend() {
- return this.lastHeartbeatSend;
- }
-
- public long timeToNextHeartbeat(long now) {
- long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
-
- long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
- if (timeSinceLastHeartbeat > hbInterval)
- return 0;
- else
- return hbInterval - timeSinceLastHeartbeat;
- }
-
- public boolean sessionTimeoutExpired(long now) {
- return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
- }
-
- public long interval() {
- return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
- }
-
- public void resetSessionTimeout(long now) {
- this.lastSessionReset = now;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoAvailableBrokersException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoAvailableBrokersException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoAvailableBrokersException.java
deleted file mode 100644
index 63ad46b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoAvailableBrokersException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import org.apache.kafka.copied.common.errors.InvalidMetadataException;
-
-/**
- * No brokers were available to complete a request.
- */
-public class NoAvailableBrokersException extends InvalidMetadataException {
- private static final long serialVersionUID = 1L;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
deleted file mode 100644
index d694dbf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.clients.consumer.internals;
-
-import org.apache.kafka.copied.clients.consumer.Consumer;
-import org.apache.kafka.copied.clients.consumer.ConsumerRebalanceCallback;
-import org.apache.kafka.copied.common.TopicPartition;
-
-import java.util.Collection;
-
-public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
-
- @Override
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
- @Override
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-}