You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/02 20:27:41 UTC
kafka git commit: KAFKA-2880: consumer should handle
disconnect/timeout for metadata requests
Repository: kafka
Updated Branches:
refs/heads/trunk 8c3c9548b -> 9fb1e2573
KAFKA-2880: consumer should handle disconnect/timeout for metadata requests
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes #581 from hachikuji/KAFKA-2880
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fb1e257
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fb1e257
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fb1e257
Branch: refs/heads/trunk
Commit: 9fb1e25738f89b75a36ef69f730b0e138ccd55b1
Parents: 8c3c954
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Dec 2 11:27:36 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 2 11:27:36 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 6 +
.../consumer/internals/AbstractCoordinator.java | 35 +--
.../consumer/internals/ConsumerCoordinator.java | 5 +-
.../internals/ConsumerNetworkClient.java | 23 +-
.../clients/consumer/internals/Fetcher.java | 267 ++++++++++---------
.../common/errors/DisconnectException.java | 1 +
.../kafka/common/requests/MetadataResponse.java | 37 ++-
.../clients/consumer/internals/FetcherTest.java | 67 ++++-
.../org/apache/kafka/common/utils/MockTime.java | 8 +
.../main/scala/kafka/admin/AdminClient.scala | 6 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 14 +
.../kafka/api/PlaintextConsumerTest.scala | 16 +-
12 files changed, 313 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6c15df3..9b36af6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1136,6 +1136,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
+ * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+ * expiration of the configured request timeout
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
@@ -1160,6 +1163,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The map of topics and its partitions
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+ * expiration of the configured request timeout
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9aa1aaf..322de5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,7 +234,7 @@ public abstract class AbstractCoordinator implements Closeable {
continue;
else if (!future.isRetriable())
throw exception;
- Utils.sleep(retryBackoffMs);
+ time.sleep(retryBackoffMs);
}
}
}
@@ -484,11 +483,7 @@ public abstract class AbstractCoordinator implements Closeable {
private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Group metadata response {}", resp);
- // parse the response to get the coordinator info if it is not disconnected,
- // otherwise we need to request metadata update
- if (resp.wasDisconnected()) {
- future.raise(new DisconnectException());
- } else if (!coordinatorUnknown()) {
+ if (!coordinatorUnknown()) {
// We already found the coordinator, so ignore the request
future.complete(null);
} else {
@@ -661,25 +656,19 @@ public abstract class AbstractCoordinator implements Closeable {
public abstract void handle(R response, RequestFuture<T> future);
@Override
- public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
- this.response = clientResponse;
-
- if (clientResponse.wasDisconnected()) {
- int correlation = response.request().request().header().correlationId();
- log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
- response.request(),
- correlation,
- response.request().request().destination());
-
- // mark the coordinator as dead
+ public void onFailure(RuntimeException e, RequestFuture<T> future) {
+ // mark the coordinator as dead
+ if (e instanceof DisconnectException)
coordinatorDead();
- future.raise(new DisconnectException());
- return;
- }
+ future.raise(e);
+ }
+ @Override
+ public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
try {
- R response = parse(clientResponse);
- handle(response, future);
+ this.response = clientResponse;
+ R responseObj = parse(clientResponse);
+ handle(responseObj, future);
} catch (RuntimeException e) {
if (!future.isDone())
future.raise(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f6d1029..8453c7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,7 +293,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!future.isRetriable())
throw future.exception();
- Utils.sleep(retryBackoffMs);
+ time.sleep(retryBackoffMs);
}
}
@@ -358,7 +357,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!future.isRetriable())
throw future.exception();
- Utils.sleep(retryBackoffMs);
+ time.sleep(retryBackoffMs);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index f2e215d..84c312e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -18,12 +18,15 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -45,6 +48,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
* How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
*/
public class ConsumerNetworkClient implements Closeable {
+ private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
+
private final KafkaClient client;
private final AtomicBoolean wakeup = new AtomicBoolean(false);
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
@@ -88,7 +93,9 @@ public class ConsumerNetworkClient implements Closeable {
* Send a new request. Note that the request is not actually transmitted on the
* network until one of the {@link #poll(long)} variants is invoked. At this
* point the request will either be transmitted successfully or will fail.
- * Use the returned future to obtain the result of the send.
+ * Use the returned future to obtain the result of the send. Note that there is no
+ * need to check for disconnects explicitly on the {@link ClientResponse} object;
+ * instead, the future will be failed with a {@link DisconnectException}.
* @param node The destination of the request
* @param api The Kafka API call
* @param request The request payload
@@ -258,7 +265,7 @@ public class ConsumerNetworkClient implements Closeable {
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
- handler.complete(new ClientResponse(request, now, true, null));
+ handler.onComplete(new ClientResponse(request, now, true, null));
}
iterator.remove();
}
@@ -350,7 +357,17 @@ public class ConsumerNetworkClient implements Closeable {
@Override
public void onComplete(ClientResponse response) {
- complete(response);
+ if (response.wasDisconnected()) {
+ ClientRequest request = response.request();
+ RequestSend send = request.request();
+ ApiKeys api = ApiKeys.forId(send.header().apiKey());
+ int correlation = send.header().correlationId();
+ log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
+ api, request, correlation, send.destination());
+ raise(DisconnectException.INSTANCE);
+ } else {
+ complete(response);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a034264..5708869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -25,9 +25,11 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -113,7 +115,7 @@ public class Fetcher<K, V> {
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
- this.records = new LinkedList<PartitionRecords<K, V>>();
+ this.records = new LinkedList<>();
this.offsetOutOfRangePartitions = new HashMap<>();
this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();
@@ -191,45 +193,82 @@ public class Fetcher<K, V> {
if (topics != null && topics.isEmpty())
return Collections.emptyMap();
- final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
- long startTime = time.milliseconds();
+ long start = time.milliseconds();
+ long remaining = timeout;
- while (time.milliseconds() - startTime < timeout) {
- RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(topics);
- if (requestFuture != null) {
- client.poll(requestFuture);
+ do {
+ RequestFuture<ClientResponse> future = sendMetadataRequest(topics);
+ client.poll(future, remaining);
- if (requestFuture.succeeded()) {
- MetadataResponse response =
- new MetadataResponse(requestFuture.value().responseBody());
+ if (future.failed() && !future.isRetriable())
+ throw future.exception();
- for (String topic : response.cluster().topics())
- topicsPartitionInfos.put(
- topic, response.cluster().availablePartitionsForTopic(topic));
+ if (future.succeeded()) {
+ MetadataResponse response = new MetadataResponse(future.value().responseBody());
+ Cluster cluster = response.cluster();
+
+ Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
+ if (!unauthorizedTopics.isEmpty())
+ throw new TopicAuthorizationException(unauthorizedTopics);
+
+ boolean shouldRetry = false;
+ if (!response.errors().isEmpty()) {
+ // if there were errors, we need to check whether they were fatal or whether
+ // we should just retry
+
+ log.debug("Topic metadata fetch included errors: {}", response.errors());
+
+ for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) {
+ String topic = errorEntry.getKey();
+ Errors error = errorEntry.getValue();
+
+ if (error == Errors.INVALID_TOPIC_EXCEPTION)
+ throw new InvalidTopicException("Topic '" + topic + "' is invalid");
+ else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ // if a requested topic is unknown, we just continue and let it be absent
+ // in the returned map
+ continue;
+ else if (error.exception() instanceof RetriableException)
+ shouldRetry = true;
+ else
+ throw new KafkaException("Unexpected error fetching metadata for topic " + topic,
+ error.exception());
+ }
+ }
+ if (!shouldRetry) {
+ HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
+ for (String topic : cluster.topics())
+ topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
return topicsPartitionInfos;
}
-
- if (!requestFuture.isRetriable())
- throw requestFuture.exception();
}
- Utils.sleep(retryBackoffMs);
- }
+ long elapsed = time.milliseconds() - start;
+ remaining = timeout - elapsed;
- return topicsPartitionInfos;
+ if (remaining > 0) {
+ long backoff = Math.min(remaining, retryBackoffMs);
+ time.sleep(backoff);
+ remaining -= backoff;
+ }
+ } while (remaining > 0);
+
+ throw new TimeoutException("Timeout expired while fetching topic metadata");
}
/**
* Send Metadata Request to least loaded node in Kafka cluster asynchronously
* @return A future that indicates result of sent metadata request
*/
- public RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
+ private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
if (topics == null)
topics = Collections.emptyList();
final Node node = client.leastLoadedNode();
- return node == null ? null :
- client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
+ if (node == null)
+ return RequestFuture.noBrokersAvailable();
+ else
+ return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
}
/**
@@ -277,7 +316,7 @@ public class Fetcher<K, V> {
if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
- Utils.sleep(retryBackoffMs);
+ time.sleep(retryBackoffMs);
}
}
@@ -435,29 +474,25 @@ public class Fetcher<K, V> {
private void handleListOffsetResponse(TopicPartition topicPartition,
ClientResponse clientResponse,
RequestFuture<Long> future) {
- if (clientResponse.wasDisconnected()) {
- future.raise(new DisconnectException());
+ ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
+ short errorCode = lor.responseData().get(topicPartition).errorCode;
+ if (errorCode == Errors.NONE.code()) {
+ List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+ if (offsets.size() != 1)
+ throw new IllegalStateException("This should not happen.");
+ long offset = offsets.get(0);
+ log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+
+ future.complete(offset);
+ } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+ topicPartition);
+ future.raise(Errors.forCode(errorCode));
} else {
- ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
- short errorCode = lor.responseData().get(topicPartition).errorCode;
- if (errorCode == Errors.NONE.code()) {
- List<Long> offsets = lor.responseData().get(topicPartition).offsets;
- if (offsets.size() != 1)
- throw new IllegalStateException("This should not happen.");
- long offset = offsets.get(0);
- log.debug("Fetched offset {} for partition {}", offset, topicPartition);
-
- future.complete(offset);
- } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
- log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
- topicPartition);
- future.raise(Errors.forCode(errorCode));
- } else {
- log.error("Attempt to fetch offsets for partition {} failed due to: {}",
- topicPartition, Errors.forCode(errorCode).exception().getMessage());
- future.raise(new StaleMetadataException());
- }
+ log.error("Attempt to fetch offsets for partition {} failed due to: {}",
+ topicPartition, Errors.forCode(errorCode).exception().getMessage());
+ future.raise(new StaleMetadataException());
}
}
@@ -502,84 +537,78 @@ public class Fetcher<K, V> {
* The callback for fetch completion
*/
private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
- if (resp.wasDisconnected()) {
- int correlation = resp.request().request().header().correlationId();
- log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
- resp.request(), correlation, resp.request().request().destination());
- } else {
- int totalBytes = 0;
- int totalCount = 0;
- FetchResponse response = new FetchResponse(resp.responseBody());
- for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- FetchResponse.PartitionData partition = entry.getValue();
- if (!subscriptions.isFetchable(tp)) {
- // this can happen when a rebalance happened or a partition consumption paused
- // while fetch is still in-flight
- log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
- } else if (partition.errorCode == Errors.NONE.code()) {
- long fetchOffset = request.fetchData().get(tp).offset;
-
- // we are interested in this fetch only if the beginning offset matches the
- // current consumed position
- Long consumed = subscriptions.consumed(tp);
- if (consumed == null) {
- continue;
- } else if (consumed != fetchOffset) {
- // the fetched position has gotten out of sync with the consumed position
- // (which might happen when a rebalance occurs with a fetch in-flight),
- // so we need to reset the fetch position so the next fetch is right
- subscriptions.fetched(tp, consumed);
- continue;
- }
-
- int bytes = 0;
- ByteBuffer buffer = partition.recordSet;
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
- List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
- for (LogEntry logEntry : records) {
- parsed.add(parseRecord(tp, logEntry));
- bytes += logEntry.size();
- }
+ int totalBytes = 0;
+ int totalCount = 0;
+ FetchResponse response = new FetchResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ FetchResponse.PartitionData partition = entry.getValue();
+ if (!subscriptions.isFetchable(tp)) {
+ // this can happen when a rebalance happened or a partition consumption paused
+ // while fetch is still in-flight
+ log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
+ } else if (partition.errorCode == Errors.NONE.code()) {
+ long fetchOffset = request.fetchData().get(tp).offset;
+
+ // we are interested in this fetch only if the beginning offset matches the
+ // current consumed position
+ Long consumed = subscriptions.consumed(tp);
+ if (consumed == null) {
+ continue;
+ } else if (consumed != fetchOffset) {
+ // the fetched position has gotten out of sync with the consumed position
+ // (which might happen when a rebalance occurs with a fetch in-flight),
+ // so we need to reset the fetch position so the next fetch is right
+ subscriptions.fetched(tp, consumed);
+ continue;
+ }
- if (!parsed.isEmpty()) {
- ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
- this.subscriptions.fetched(tp, record.offset() + 1);
- this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
- this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
- } else if (buffer.limit() > 0) {
- // we did not read a single message from a non-empty buffer
- // because that message's size is larger than fetch size, in this case
- // record this exception
- this.recordTooLargePartitions.put(tp, fetchOffset);
- }
+ int bytes = 0;
+ ByteBuffer buffer = partition.recordSet;
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+ for (LogEntry logEntry : records) {
+ parsed.add(parseRecord(tp, logEntry));
+ bytes += logEntry.size();
+ }
- this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
- totalBytes += bytes;
- totalCount += parsed.size();
- } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
- this.metadata.requestUpdate();
- } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
- long fetchOffset = request.fetchData().get(tp).offset;
- if (subscriptions.hasDefaultOffsetResetPolicy())
- subscriptions.needOffsetReset(tp);
- else
- this.offsetOutOfRangePartitions.put(tp, fetchOffset);
- log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
- } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
- log.warn("Not authorized to read from topic {}.", tp.topic());
- unauthorizedTopics.add(tp.topic());
- } else if (partition.errorCode == Errors.UNKNOWN.code()) {
- log.warn("Unknown error fetching data for topic-partition {}", tp);
- } else {
- throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+ if (!parsed.isEmpty()) {
+ ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
+ this.subscriptions.fetched(tp, record.offset() + 1);
+ this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
+ this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+ } else if (buffer.limit() > 0) {
+ // we did not read a single message from a non-empty buffer
+ // because that message's size is larger than fetch size, in this case
+ // record this exception
+ this.recordTooLargePartitions.put(tp, fetchOffset);
}
+
+ this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+ totalBytes += bytes;
+ totalCount += parsed.size();
+ } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ this.metadata.requestUpdate();
+ } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+ long fetchOffset = request.fetchData().get(tp).offset;
+ if (subscriptions.hasDefaultOffsetResetPolicy())
+ subscriptions.needOffsetReset(tp);
+ else
+ this.offsetOutOfRangePartitions.put(tp, fetchOffset);
+ log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+ } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+ log.warn("Not authorized to read from topic {}.", tp.topic());
+ unauthorizedTopics.add(tp.topic());
+ } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+ log.warn("Unknown error fetching data for topic-partition {}", tp);
+ } else {
+ throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
}
- this.sensors.bytesFetched.record(totalBytes);
- this.sensors.recordsFetched.record(totalCount);
- this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
+ this.sensors.bytesFetched.record(totalBytes);
+ this.sensors.recordsFetched.record(totalCount);
+ this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
this.sensors.fetchLatency.record(resp.requestLatencyMs());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
index 18d61a2..557681c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java
@@ -17,6 +17,7 @@ package org.apache.kafka.common.errors;
* Server disconnected before a request could be completed.
*/
public class DisconnectException extends RetriableException {
+ public static final DisconnectException INSTANCE = new DisconnectException();
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index eb163dd..170e4b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -45,9 +45,12 @@ public class MetadataResponse extends AbstractRequestResponse {
private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
/**
- * Possible error code:
+ * Possible error codes:
*
- * TODO
+ * UnknownTopic (3)
+ * LeaderNotAvailable (5)
+ * InvalidTopic (17)
+ * TopicAuthorizationFailed (29)
*/
private static final String TOPIC_KEY_NAME = "topic";
@@ -57,9 +60,10 @@ public class MetadataResponse extends AbstractRequestResponse {
private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
/**
- * Possible error code:
+ * Possible error codes:
*
- * TODO
+ * LeaderNotAvailable (5)
+ * ReplicaNotAvailable (9)
*/
private static final String PARTITION_KEY_NAME = "partition_id";
@@ -87,14 +91,20 @@ public class MetadataResponse extends AbstractRequestResponse {
}
struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
List<Struct> topicArray = new ArrayList<Struct>();
- for (String topic : cluster.topics()) {
+ for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, errorEntry.getKey());
+ topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errorEntry.getValue().code());
+ topicData.set(PARTITION_METADATA_KEY_NAME, new Struct[0]);
+ topicArray.add(topicData);
+ }
- topicData.set(TOPIC_KEY_NAME, topic);
- if (errors.containsKey(topic)) {
- topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
- } else {
+ for (String topic : cluster.topics()) {
+ if (!errors.containsKey(topic)) {
+ Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topic);
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
List<Struct> partitionArray = new ArrayList<Struct>();
for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
@@ -113,14 +123,13 @@ public class MetadataResponse extends AbstractRequestResponse {
partitionArray.add(partitionData);
}
topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
}
-
- topicArray.add(topicData);
}
struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
this.cluster = cluster;
- this.errors = new HashMap<String, Errors>();
+ this.errors = errors;
}
public MetadataResponse(Struct struct) {
@@ -183,6 +192,10 @@ public class MetadataResponse extends AbstractRequestResponse {
return this.errors;
}
+ public boolean hasError(String topic) {
+ return this.errors.containsKey(topic);
+ }
+
public Cluster cluster() {
return this.cluster;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fe9a6aa..1ffff4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,14 +22,16 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
@@ -73,7 +75,7 @@ public class FetcherTest {
private int maxWaitMs = 0;
private int fetchSize = 1000;
private long retryBackoffMs = 100;
- private MockTime time = new MockTime();
+ private MockTime time = new MockTime(1);
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
@@ -395,7 +397,7 @@ public class FetcherTest {
}
@Test
- public void testGetAllTopics() throws InterruptedException {
+ public void testGetAllTopics() {
// sending response before request, as getTopicMetadata is a blocking call
client.prepareResponse(
new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
@@ -405,6 +407,63 @@ public class FetcherTest {
assertEquals(cluster.topics().size(), allTopics.size());
}
+ @Test
+ public void testGetAllTopicsDisconnect() {
+ // first try gets a disconnect, next succeeds
+ client.prepareResponse(null, true);
+ client.prepareResponse(new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+ Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
+ assertEquals(cluster.topics().size(), allTopics.size());
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testGetAllTopicsTimeout() {
+ // since no response is prepared, the request should timeout
+ fetcher.getAllTopicMetadata(50L);
+ }
+
+ @Test
+ public void testGetAllTopicsUnauthorized() {
+ client.prepareResponse(new MetadataResponse(cluster,
+ Collections.singletonMap(topicName, Errors.TOPIC_AUTHORIZATION_FAILED)).toStruct());
+ try {
+ fetcher.getAllTopicMetadata(10L);
+ fail();
+ } catch (TopicAuthorizationException e) {
+ assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+ }
+ }
+
+ @Test(expected = InvalidTopicException.class)
+ public void testGetTopicMetadataInvalidTopic() {
+ client.prepareResponse(new MetadataResponse(cluster,
+ Collections.singletonMap(topicName, Errors.INVALID_TOPIC_EXCEPTION)).toStruct());
+ fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ }
+
+ @Test
+ public void testGetTopicMetadataUnknownTopic() {
+ Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet());
+ client.prepareResponse(new MetadataResponse(emptyCluster,
+ Collections.singletonMap(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toStruct());
+
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ assertNull(topicMetadata.get(topicName));
+ }
+
+ @Test
+ public void testGetTopicMetadataLeaderNotAvailable() {
+ Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet());
+ client.prepareResponse(new MetadataResponse(emptyCluster,
+ Collections.singletonMap(topicName, Errors.LEADER_NOT_AVAILABLE)).toStruct());
+ client.prepareResponse(new MetadataResponse(this.cluster,
+ Collections.<String, Errors>emptyMap()).toStruct());
+ Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
+ assertTrue(topicMetadata.containsKey(topicName));
+ }
+
/*
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@@ -457,7 +516,7 @@ public class FetcherTest {
}
private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
- return new Fetcher<byte[], byte[]>(consumerClient,
+ return new Fetcher<>(consumerClient,
minBytes,
maxWaitMs,
fetchSize,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index 387e48f..533f869 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -20,18 +20,26 @@ import java.util.concurrent.TimeUnit;
public class MockTime implements Time {
private long nanos = 0;
+ private long autoTickMs = 0;
public MockTime() {
this.nanos = System.nanoTime();
}
+ public MockTime(long autoTickMs) {
+ this.nanos = System.nanoTime();
+ this.autoTickMs = autoTickMs;
+ }
+
@Override
public long milliseconds() {
+ this.sleep(autoTickMs);
return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
}
@Override
public long nanoseconds() {
+ this.sleep(autoTickMs);
return nanos;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 53b6fdb..3a7e6de 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -52,11 +52,7 @@ class AdminClient(val time: Time,
client.poll(future)
if (future.succeeded())
- return if (future.value().wasDisconnected()) {
- throw new DisconnectException()
- } else {
- future.value().responseBody()
- }
+ return future.value().responseBody()
now = time.milliseconds()
} while (now < deadline && future.exception().isInstanceOf[SendFailedException])
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c8ca2a3..26ab885 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -499,6 +499,20 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
this.consumers.head.position(tp)
}
+ @Test
+ def testListOffsetsWithNoTopicAccess() {
+ val e = intercept[TopicAuthorizationException] {
+ this.consumers.head.partitionsFor(topic);
+ }
+ assertEquals(Set(topic), e.unauthorizedTopics().asScala)
+ }
+
+ @Test
+ def testListOfsetsWithTopicDescribe() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ this.consumers.head.partitionsFor(topic);
+ }
+
def removeAllAcls() = {
servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
servers.head.apis.authorizer.get.removeAcls(resource)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6fabfdc..90e9562 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -18,9 +18,9 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable.Buffer
@@ -249,7 +249,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val parts = this.consumers(0).partitionsFor("part-test")
assertNotNull(parts)
assertEquals(2, parts.size)
- assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+ }
+
+ @Test
+ def testPartitionsForAutoCreate() {
+ val partitions = this.consumers(0).partitionsFor("non-exist-topic")
+ assertFalse(partitions.isEmpty)
+ }
+
+ @Test(expected=classOf[InvalidTopicException])
+ def testPartitionsForInvalidTopic() {
+ this.consumers(0).partitionsFor(";3# ads,{234")
}
@Test