You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/10 19:20:37 UTC
[2/3] kafka git commit: KAFKA-1910;
Refactor new consumer and fixed a bunch of corner cases / unit tests;
reviewed by Onur Karaman and Jay Kreps
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
new file mode 100644
index 0000000..e972efb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.ConsumerMetadataRequest;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class manage the coordination process with the consumer coordinator.
+ */
+public final class Coordinator {
+
+ private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
+
+ private final KafkaClient client;
+
+ private final Time time;
+ private final String groupId;
+ private final Metadata metadata;
+ private final Heartbeat heartbeat;
+ private final long sessionTimeoutMs;
+ private final String assignmentStrategy;
+ private final SubscriptionState subscriptions;
+ private final CoordinatorMetrics sensors;
+ private final long retryBackoffMs;
+ private Node consumerCoordinator;
+ private String consumerId;
+ private int generation;
+
+ /**
+ * Initialize the coordination manager.
+ */
+ public Coordinator(KafkaClient client,
+ String groupId,
+ long retryBackoffMs,
+ long sessionTimeoutMs,
+ String assignmentStrategy,
+ Metadata metadata,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time) {
+
+ this.time = time;
+ this.client = client;
+ this.generation = -1;
+ this.consumerId = "";
+ this.groupId = groupId;
+ this.metadata = metadata;
+ this.consumerCoordinator = null;
+ this.subscriptions = subscriptions;
+ this.retryBackoffMs = retryBackoffMs;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.assignmentStrategy = assignmentStrategy;
+ this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+ this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+ }
+
+ /**
+ * Assign partitions for the subscribed topics.
+ *
+ * @param subscribedTopics The subscribed topics list
+ * @param now The current time
+ * @return The assigned partition info
+ */
+ public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) {
+
+ // send a join group request to the coordinator
+ log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
+
+ JoinGroupRequest request = new JoinGroupRequest(groupId,
+ (int) this.sessionTimeoutMs,
+ subscribedTopics,
+ this.consumerId,
+ this.assignmentStrategy);
+ ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
+
+ // process the response
+ JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
+ // TODO: needs to handle disconnects and errors
+ Errors.forCode(response.errorCode()).maybeThrow();
+ this.consumerId = response.consumerId();
+
+ // set the flag to refresh last committed offsets
+ this.subscriptions.needRefreshCommits();
+
+ log.debug("Joined group: {}", response);
+
+ // record re-assignment time
+ this.sensors.partitionReassignments.record(time.milliseconds() - now);
+
+ // return assigned partitions
+ return response.assignedPartitions();
+ }
+
+ /**
+ * Commit offsets for the specified list of topics and partitions.
+ *
+ * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+ * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
+ * the commit succeeds.
+ *
+ * @param offsets The list of offsets per partition that should be committed.
+ * @param blocking Control whether the commit is blocking
+ * @param now The current time
+ */
+ public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) {
+ if (!offsets.isEmpty()) {
+ // create the offset commit request
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
+ offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
+ OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, offsetData);
+
+ // send request and possibly wait for response if it is blocking
+ RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);
+
+ if (blocking) {
+ boolean done;
+ do {
+ ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+
+ // check for errors
+ done = true;
+ OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
+ for (short errorCode : commitResponse.responseData().values()) {
+ if (errorCode != Errors.NONE.code())
+ done = false;
+ }
+ if (!done) {
+ log.debug("Error in offset commit, backing off for {} ms before retrying again.",
+ this.retryBackoffMs);
+ Utils.sleep(this.retryBackoffMs);
+ }
+ } while (!done);
+ } else {
+ this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
+ }
+ }
+ }
+
+ /**
+ * Fetch the committed offsets of the given set of partitions.
+ *
+ * @param partitions The list of partitions which need to ask for committed offsets
+ * @param now The current time
+ * @return The fetched offset values
+ */
+ public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) {
+ log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+
+ while (true) {
+ // construct the request
+ OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+
+ // send the request and block on waiting for response
+ ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
+
+ // parse the response to get the offsets
+ boolean offsetsReady = true;
+ OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
+ // TODO: needs to handle disconnects
+ Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
+ for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetFetchResponse.PartitionData data = entry.getValue();
+ if (data.hasError()) {
+ log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+ .exception()
+ .getMessage());
+ if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+ // just retry
+ offsetsReady = false;
+ Utils.sleep(this.retryBackoffMs);
+ } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ // re-discover the coordinator and retry
+ coordinatorDead();
+ offsetsReady = false;
+ Utils.sleep(this.retryBackoffMs);
+ } else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code()
+ || data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ // just ignore this partition
+ log.debug("No committed offset for partition " + tp);
+ } else {
+ throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset");
+ }
+ } else if (data.offset >= 0) {
+ // record the position with the offset (-1 seems to indicate no
+ // such offset known)
+ offsets.put(tp, data.offset);
+ } else {
+ log.debug("No committed offset for partition " + tp);
+ }
+ }
+
+ if (offsetsReady)
+ return offsets;
+ }
+ }
+
+ /**
+ * Attempt to heartbeat the consumer coordinator if necessary, and check if the coordinator is still alive.
+ *
+ * @param now The current time
+ */
+ public void maybeHeartbeat(long now) {
+ if (heartbeat.shouldHeartbeat(now)) {
+ HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
+ this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now));
+ this.heartbeat.sentHeartbeat(now);
+ }
+ }
+
+ public boolean coordinatorUnknown() {
+ return this.consumerCoordinator == null;
+ }
+
+ /**
+ * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
+ * disconnected). Note that this means any requests sent this way must be idempotent.
+ *
+ * @return The response
+ */
+ private ClientResponse blockingCoordinatorRequest(ApiKeys api,
+ Struct request,
+ RequestCompletionHandler handler,
+ long now) {
+ while (true) {
+ ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now);
+ ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
+ if (coordinatorResponse.wasDisconnected()) {
+ handleCoordinatorDisconnect(coordinatorResponse);
+ Utils.sleep(this.retryBackoffMs);
+ } else {
+ return coordinatorResponse;
+ }
+ }
+ }
+
+ /**
+ * Ensure the consumer coordinator is known and we have a ready connection to it.
+ */
+ private void ensureCoordinatorReady() {
+ while (true) {
+ if (this.consumerCoordinator == null)
+ discoverCoordinator();
+
+ while (true) {
+ boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
+ if (ready) {
+ return;
+ } else {
+ log.debug("No connection to coordinator, attempting to connect.");
+ this.client.poll(this.retryBackoffMs, time.milliseconds());
+
+ // if the coordinator connection has failed, we need to
+ // break the inner loop to re-discover the coordinator
+ if (this.client.connectionFailed(this.consumerCoordinator)) {
+ log.debug("Coordinator connection failed. Attempting to re-discover.");
+ coordinatorDead();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Mark the current coordinator as dead.
+ */
+ private void coordinatorDead() {
+ if (this.consumerCoordinator != null) {
+ log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
+ this.consumerCoordinator = null;
+ }
+ }
+
+ /**
+ * Keep discovering the consumer coordinator until it is found.
+ */
+ private void discoverCoordinator() {
+ while (this.consumerCoordinator == null) {
+ log.debug("No coordinator known, attempting to discover one.");
+ Node coordinator = fetchConsumerCoordinator();
+
+ if (coordinator == null) {
+ log.debug("No coordinator found, backing off.");
+ Utils.sleep(this.retryBackoffMs);
+ } else {
+ log.debug("Found coordinator: " + coordinator);
+ this.consumerCoordinator = coordinator;
+ }
+ }
+ }
+
+ /**
+ * Get the current consumer coordinator information via consumer metadata request.
+ *
+ * @return the consumer coordinator node
+ */
+ private Node fetchConsumerCoordinator() {
+
+ // initiate the consumer metadata request
+ ClientRequest request = initiateConsumerMetadataRequest();
+
+ // send the request and wait for its response
+ ClientResponse response = sendAndReceive(request, request.createdTime());
+
+ // parse the response to get the coordinator info if it is not disconnected,
+ // otherwise we need to request metadata update
+ if (!response.wasDisconnected()) {
+ ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
+ // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
+ // for the coordinator in the underlying network client layer
+ // TODO: this needs to be better handled in KAFKA-1935
+ if (consumerMetadataResponse.errorCode() == Errors.NONE.code())
+ return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
+ consumerMetadataResponse.node().host(),
+ consumerMetadataResponse.node().port());
+ } else {
+ this.metadata.requestUpdate();
+ }
+
+ return null;
+ }
+
+ /**
+ * Handle the case when the request gets cancelled due to coordinator disconnection.
+ */
+ private void handleCoordinatorDisconnect(ClientResponse response) {
+ int correlation = response.request().request().header().correlationId();
+ log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
+ response.request(),
+ correlation,
+ response.request().request().destination());
+
+ // mark the coordinator as dead
+ coordinatorDead();
+ }
+
+ /**
+ * Initiate a consumer metadata request to the least loaded node.
+ *
+ * @return The created request
+ */
+ private ClientRequest initiateConsumerMetadataRequest() {
+
+ // find a node to ask about the coordinator
+ Node node = this.client.leastLoadedNode(time.milliseconds());
+ while (node == null || !this.client.ready(node, time.milliseconds())) {
+ long now = time.milliseconds();
+ this.client.poll(this.retryBackoffMs, now);
+ node = this.client.leastLoadedNode(now);
+
+ // if there is no ready node, backoff before retry
+ if (node == null)
+ Utils.sleep(this.retryBackoffMs);
+ }
+
+ // create a consumer metadata request
+ log.debug("Issuing consumer metadata request to broker {}", node.id());
+
+ ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId);
+ RequestSend send = new RequestSend(node.id(),
+ this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
+ request.toStruct());
+ long now = time.milliseconds();
+ return new ClientRequest(now, true, send, null);
+ }
+
+ /**
+ * Initiate a request to the coordinator.
+ */
+ private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
+
+ // first make sure the coordinator is known and ready
+ ensureCoordinatorReady();
+
+ // create the request for the coordinator
+ log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id());
+
+ RequestHeader header = this.client.nextRequestHeader(api);
+ RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
+ return new ClientRequest(now, true, send, handler);
+ }
+
+ /**
+ * Attempt to send a request and receive its response.
+ *
+ * @return The response
+ */
+ private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) {
+
+ // send the request
+ this.client.send(clientRequest);
+
+ // drain all responses from the destination node
+ List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now);
+ if (responses.isEmpty()) {
+ throw new IllegalStateException("This should not happen.");
+ } else {
+ // other requests should be handled by the callback, and
+ // we only care about the response of the last request
+ return responses.get(responses.size() - 1);
+ }
+ }
+
+ private class HeartbeatCompletionHandler implements RequestCompletionHandler {
+ @Override
+ public void onComplete(ClientResponse resp) {
+ if (resp.wasDisconnected()) {
+ handleCoordinatorDisconnect(resp);
+ } else {
+ HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
+ if (response.errorCode() == Errors.NONE.code()) {
+ log.debug("Received successful heartbeat response.");
+ } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
+ subscriptions.needReassignment();
+ } else {
+ throw new KafkaException("Unexpected error in heartbeat response: "
+ + Errors.forCode(response.errorCode()).exception().getMessage());
+ }
+ }
+ sensors.heartbeatLatency.record(resp.requestLatencyMs());
+ }
+ }
+
+ private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
+
+ private final Map<TopicPartition, Long> offsets;
+
+ public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) {
+ this.offsets = offsets;
+ }
+
+ @Override
+ public void onComplete(ClientResponse resp) {
+ if (resp.wasDisconnected()) {
+ handleCoordinatorDisconnect(resp);
+ } else {
+ OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ short errorCode = entry.getValue();
+ long offset = this.offsets.get(tp);
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ subscriptions.committed(tp, offset);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ } else {
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ }
+ }
+ }
+ sensors.commitLatency.record(resp.requestLatencyMs());
+ }
+ }
+
+ private class CoordinatorMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor commitLatency;
+ public final Sensor heartbeatLatency;
+ public final Sensor partitionReassignments;
+
+ public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+ this.commitLatency = metrics.sensor("commit-latency");
+ this.commitLatency.add(new MetricName("commit-latency-avg",
+ this.metricGrpName,
+ "The average time taken for a commit request",
+ tags), new Avg());
+ this.commitLatency.add(new MetricName("commit-latency-max",
+ this.metricGrpName,
+ "The max time taken for a commit request",
+ tags), new Max());
+ this.commitLatency.add(new MetricName("commit-rate",
+ this.metricGrpName,
+ "The number of commit calls per second",
+ tags), new Rate(new Count()));
+
+ this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+ this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+ this.metricGrpName,
+ "The max time taken to receive a response to a hearbeat request",
+ tags), new Max());
+ this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+ this.metricGrpName,
+ "The average number of heartbeats per second",
+ tags), new Rate(new Count()));
+
+ this.partitionReassignments = metrics.sensor("reassignment-latency");
+ this.partitionReassignments.add(new MetricName("reassignment-time-avg",
+ this.metricGrpName,
+ "The average time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-time-max",
+ this.metricGrpName,
+ "The max time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-rate",
+ this.metricGrpName,
+ "The number of partition reassignments per second",
+ tags), new Rate(new Count()));
+
+ Measurable numParts =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
+ metrics.addMetric(new MetricName("assigned-partitions",
+ this.metricGrpName,
+ "The number of partitions currently assigned to this consumer",
+ tags),
+ numParts);
+
+ Measurable lastHeartbeat =
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ };
+ metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+ this.metricGrpName,
+ "The number of seconds since the last controller heartbeat",
+ tags),
+ lastHeartbeat);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
new file mode 100644
index 0000000..27c78b8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This class manage the fetching process with the brokers.
+ */
+public class Fetcher<K, V> {
+
+ private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
+ private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+
+
+ private final KafkaClient client;
+
+ private final Time time;
+ private final int minBytes;
+ private final int maxWaitMs;
+ private final int fetchSize;
+ private final boolean checkCrcs;
+ private final long retryBackoffMs;
+ private final Metadata metadata;
+ private final FetchManagerMetrics sensors;
+ private final SubscriptionState subscriptions;
+ private final List<PartitionRecords<K, V>> records;
+ private final AutoOffsetResetStrategy offsetResetStrategy;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valueDeserializer;
+
+
+ public Fetcher(KafkaClient client,
+ long retryBackoffMs,
+ int minBytes,
+ int maxWaitMs,
+ int fetchSize,
+ boolean checkCrcs,
+ String offsetReset,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ Metadata metadata,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ Time time) {
+
+ this.time = time;
+ this.client = client;
+ this.metadata = metadata;
+ this.subscriptions = subscriptions;
+ this.retryBackoffMs = retryBackoffMs;
+ this.minBytes = minBytes;
+ this.maxWaitMs = maxWaitMs;
+ this.fetchSize = fetchSize;
+ this.checkCrcs = checkCrcs;
+ this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset);
+
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+
+ this.records = new LinkedList<PartitionRecords<K, V>>();
+ this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
+ }
+
+ /**
+ * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
+ *
+ * @param cluster The current cluster metadata
+ * @param now The current time
+ */
+ public void initFetches(Cluster cluster, long now) {
+ for (ClientRequest request : createFetchRequests(cluster)) {
+ Node node = cluster.nodeById(request.request().destination());
+ if (client.ready(node, now)) {
+ log.trace("Initiating fetch to node {}: {}", node.id(), request);
+ client.send(request);
+ }
+ }
+ }
+
+ /**
+ * Return the fetched records, empty the record buffer and update the consumed position.
+ *
+ * @return The fetched records per partition
+ */
+ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
+ if (this.subscriptions.partitionAssignmentNeeded()) {
+ return Collections.emptyMap();
+ } else {
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ for (PartitionRecords<K, V> part : this.records) {
+ Long consumed = subscriptions.consumed(part.partition);
+ if (this.subscriptions.assignedPartitions().contains(part.partition)
+ && (consumed == null || part.fetchOffset == consumed)) {
+ List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+ if (records == null) {
+ records = part.records;
+ drained.put(part.partition, records);
+ } else {
+ records.addAll(part.records);
+ }
+ subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+ } else {
+ // these records aren't next in line based on the last consumed position, ignore them
+ // they must be from an obsolete request
+ log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
+ }
+ }
+ this.records.clear();
+ return drained;
+ }
+ }
+
+ /**
+ * Reset offsets for the given partition using the offset reset strategy.
+ *
+ * @param partition The given partition that needs reset offset
+ * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+ */
+ public void resetOffset(TopicPartition partition) {
+ long timestamp;
+ if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
+ timestamp = EARLIEST_OFFSET_TIMESTAMP;
+ else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
+ timestamp = LATEST_OFFSET_TIMESTAMP;
+ else
+ throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+ log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
+ .toLowerCase());
+ this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
+ }
+
+ /**
+ * Fetch a single offset before the given timestamp for the partition.
+ *
+ * @param topicPartition The partition that needs fetching offset.
+ * @param timestamp The timestamp for fetching offset.
+ * @return The offset of the message that is published before the given timestamp
+ */
+ public long offsetBefore(TopicPartition topicPartition, long timestamp) {
+ log.debug("Fetching offsets for partition {}.", topicPartition);
+ Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
+ partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
+ while (true) {
+ long now = time.milliseconds();
+ PartitionInfo info = metadata.fetch().partition(topicPartition);
+ if (info == null) {
+ metadata.add(topicPartition.topic());
+ log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
+ awaitMetadataUpdate();
+ } else if (info.leader() == null) {
+ log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
+ awaitMetadataUpdate();
+ } else if (this.client.ready(info.leader(), now)) {
+ Node node = info.leader();
+ ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+ RequestSend send = new RequestSend(node.id(),
+ this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
+ request.toStruct());
+ ClientRequest clientRequest = new ClientRequest(now, true, send, null);
+ this.client.send(clientRequest);
+ List<ClientResponse> responses = this.client.completeAll(node.id(), now);
+ if (responses.isEmpty())
+ throw new IllegalStateException("This should not happen.");
+ ClientResponse response = responses.get(responses.size() - 1);
+ if (response.wasDisconnected()) {
+ awaitMetadataUpdate();
+ } else {
+ ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
+ short errorCode = lor.responseData().get(topicPartition).errorCode;
+ if (errorCode == Errors.NONE.code()) {
+ List<Long> offsets = lor.responseData().get(topicPartition).offsets;
+ if (offsets.size() != 1)
+ throw new IllegalStateException("This should not happen.");
+ long offset = offsets.get(0);
+ log.debug("Fetched offset {} for partition {}", offset, topicPartition);
+ return offset;
+ } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+ log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+ topicPartition);
+ awaitMetadataUpdate();
+ } else {
+ Errors.forCode(errorCode).maybeThrow();
+ }
+ }
+ } else {
+ log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition);
+ client.poll(this.retryBackoffMs, now);
+ }
+ }
+ }
+
+ /**
+ * Create fetch requests for all nodes for which we have assigned partitions
+ * that have no existing requests in flight.
+ */
+ private List<ClientRequest> createFetchRequests(Cluster cluster) {
+ // create the fetch info
+ Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
+ for (TopicPartition partition : subscriptions.assignedPartitions()) {
+ Node node = cluster.leaderFor(partition);
+ // if there is a leader and no in-flight requests, issue a new fetch
+ if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
+ Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+ if (fetch == null) {
+ fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+ fetchable.put(node.id(), fetch);
+ }
+ long offset = this.subscriptions.fetched(partition);
+ fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
+ }
+ }
+
+ // create the requests
+ List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
+ for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
+ int nodeId = entry.getKey();
+ final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
+ RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
+ RequestCompletionHandler handler = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse response) {
+ handleFetchResponse(response, fetch);
+ }
+ };
+ requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
+ }
+ return requests;
+ }
+
+ /**
+ * The callback for fetch completion
+ */
+ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
+ if (resp.wasDisconnected()) {
+ int correlation = resp.request().request().header().correlationId();
+ log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
+ resp.request(), correlation, resp.request().request().destination());
+ } else {
+ int totalBytes = 0;
+ int totalCount = 0;
+ FetchResponse response = new FetchResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ FetchResponse.PartitionData partition = entry.getValue();
+ if (!subscriptions.assignedPartitions().contains(tp)) {
+ log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+ } else if (partition.errorCode == Errors.NONE.code()) {
+ int bytes = 0;
+ ByteBuffer buffer = partition.recordSet;
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ long fetchOffset = request.fetchData().get(tp).offset;
+ List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+ for (LogEntry logEntry : records) {
+ parsed.add(parseRecord(tp, logEntry));
+ bytes += logEntry.size();
+ }
+ if (parsed.size() > 0) {
+ ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
+ this.subscriptions.fetched(tp, record.offset() + 1);
+ this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
+ this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+ }
+ this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+ totalBytes += bytes;
+ totalCount += parsed.size();
+ } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+ this.metadata.requestUpdate();
+ } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+ // TODO: this could be optimized by grouping all out-of-range partitions
+ log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+ resetOffset(tp);
+ } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+ log.warn("Unknown error fetching data for topic-partition {}", tp);
+ } else {
+ throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+ }
+ }
+ this.sensors.bytesFetched.record(totalBytes);
+ this.sensors.recordsFetched.record(totalCount);
+ }
+ this.sensors.fetchLatency.record(resp.requestLatencyMs());
+ }
+
+ /**
+ * Parse the record entry, deserializing the key / value fields if necessary
+ */
+ private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
+ if (this.checkCrcs)
+ logEntry.record().ensureValid();
+
+ long offset = logEntry.offset();
+ ByteBuffer keyBytes = logEntry.record().key();
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+ ByteBuffer valueBytes = logEntry.record().value();
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+
+ return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
+ }
+
+ /*
+ * Request a metadata update and wait until it has occurred
+ */
+ private void awaitMetadataUpdate() {
+ int version = this.metadata.requestUpdate();
+ do {
+ long now = time.milliseconds();
+ this.client.poll(this.retryBackoffMs, now);
+ } while (this.metadata.version() == version);
+ }
+
+ private static class PartitionRecords<K, V> {
+ public long fetchOffset;
+ public TopicPartition partition;
+ public List<ConsumerRecord<K, V>> records;
+
+ public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
+ this.fetchOffset = fetchOffset;
+ this.partition = partition;
+ this.records = records;
+ }
+ }
+
+ private static enum AutoOffsetResetStrategy {
+ LATEST, EARLIEST, NONE
+ }
+
+ private class FetchManagerMetrics {
+ public final Metrics metrics;
+ public final String metricGrpName;
+
+ public final Sensor bytesFetched;
+ public final Sensor recordsFetched;
+ public final Sensor fetchLatency;
+ public final Sensor recordsFetchLag;
+
+
+ public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
+
+ this.bytesFetched = metrics.sensor("bytes-fetched");
+ this.bytesFetched.add(new MetricName("fetch-size-avg",
+ this.metricGrpName,
+ "The average number of bytes fetched per request",
+ tags), new Avg());
+ this.bytesFetched.add(new MetricName("fetch-size-max",
+ this.metricGrpName,
+ "The maximum number of bytes fetched per request",
+ tags), new Max());
+ this.bytesFetched.add(new MetricName("bytes-consumed-rate",
+ this.metricGrpName,
+ "The average number of bytes consumed per second",
+ tags), new Rate());
+
+ this.recordsFetched = metrics.sensor("records-fetched");
+ this.recordsFetched.add(new MetricName("records-per-request-avg",
+ this.metricGrpName,
+ "The average number of records in each request",
+ tags), new Avg());
+ this.recordsFetched.add(new MetricName("records-consumed-rate",
+ this.metricGrpName,
+ "The average number of records consumed per second",
+ tags), new Rate());
+
+ this.fetchLatency = metrics.sensor("fetch-latency");
+ this.fetchLatency.add(new MetricName("fetch-latency-avg",
+ this.metricGrpName,
+ "The average time taken for a fetch request.",
+ tags), new Avg());
+ this.fetchLatency.add(new MetricName("fetch-latency-max",
+ this.metricGrpName,
+ "The max time taken for any fetch request.",
+ tags), new Max());
+ this.fetchLatency.add(new MetricName("fetch-rate",
+ this.metricGrpName,
+ "The number of fetch requests per second.",
+ tags), new Rate(new Count()));
+
+ this.recordsFetchLag = metrics.sensor("records-lag");
+ this.recordsFetchLag.add(new MetricName("records-lag-max",
+ this.metricGrpName,
+ "The maximum lag in terms of number of records for any partition in this window",
+ tags), new Max());
+ }
+
+ public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+ // record bytes fetched
+ String name = "topic." + topic + ".bytes-fetched";
+ Sensor bytesFetched = this.metrics.getSensor(name);
+ if (bytesFetched == null)
+ bytesFetched = this.metrics.sensor(name);
+ bytesFetched.record(bytes);
+
+ // record records fetched
+ name = "topic." + topic + ".records-fetched";
+ Sensor recordsFetched = this.metrics.getSensor(name);
+ if (recordsFetched == null)
+ recordsFetched = this.metrics.sensor(name);
+ recordsFetched.record(records);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index ee0751e..e7cfaaa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -21,34 +21,20 @@ public final class Heartbeat {
* so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
* once per second.
*/
- private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+ public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
private final long timeout;
private long lastHeartbeatSend;
- private long lastHeartbeatResponse;
public Heartbeat(long timeout, long now) {
this.timeout = timeout;
this.lastHeartbeatSend = now;
- this.lastHeartbeatResponse = now;
}
public void sentHeartbeat(long now) {
this.lastHeartbeatSend = now;
}
- public void receivedResponse(long now) {
- this.lastHeartbeatResponse = now;
- }
-
- public void markDead() {
- this.lastHeartbeatResponse = -1;
- }
-
- public boolean isAlive(long now) {
- return now - lastHeartbeatResponse <= timeout;
- }
-
public boolean shouldHeartbeat(long now) {
return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index d41d306..cee7541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -43,9 +43,12 @@ public class SubscriptionState {
/* the last committed offset for each partition */
private final Map<TopicPartition, Long> committed;
- /* do we need to request a partition assignment from the co-ordinator? */
+ /* do we need to request a partition assignment from the coordinator? */
private boolean needsPartitionAssignment;
+ /* do we need to request the latest committed offsets from the coordinator? */
+ private boolean needsFetchCommittedOffsets;
+
public SubscriptionState() {
this.subscribedTopics = new HashSet<String>();
this.subscribedPartitions = new HashSet<TopicPartition>();
@@ -54,6 +57,7 @@ public class SubscriptionState {
this.fetched = new HashMap<TopicPartition, Long>();
this.committed = new HashMap<TopicPartition, Long>();
this.needsPartitionAssignment = false;
+ this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
}
public void subscribe(String topic) {
@@ -75,6 +79,10 @@ public class SubscriptionState {
clearPartition(tp);
}
+ public void needReassignment() {
+ this.needsPartitionAssignment = true;
+ }
+
public void subscribe(TopicPartition tp) {
if (this.subscribedTopics.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
@@ -119,11 +127,20 @@ public class SubscriptionState {
public void committed(TopicPartition tp, long offset) {
this.committed.put(tp, offset);
+ this.needsFetchCommittedOffsets = false;
}
public Long committed(TopicPartition tp) {
return this.committed.get(tp);
}
+
+ public void needRefreshCommits() {
+ this.needsFetchCommittedOffsets = true;
+ }
+
+ public boolean refreshCommitsNeeded() {
+ return this.needsFetchCommittedOffsets;
+ }
public void seek(TopicPartition tp, long offset) {
fetched(tp, offset);
@@ -162,7 +179,7 @@ public class SubscriptionState {
return copy;
}
- public boolean needsPartitionAssignment() {
+ public boolean partitionAssignmentNeeded() {
return this.needsPartitionAssignment;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7397e56..feda9c9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -162,21 +162,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)),
+ this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
- private static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
- Serializer<?> keySerializer, Serializer<?> valueSerializer) {
- Map<String, Object> newConfigs = new HashMap<String, Object>();
- newConfigs.putAll(configs);
- if (keySerializer != null)
- newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
- if (valueSerializer != null)
- newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
- return newConfigs;
- }
-
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
@@ -196,21 +185,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)),
+ this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
- private static Properties addSerializerToConfig(Properties properties,
- Serializer<?> keySerializer, Serializer<?> valueSerializer) {
- Properties newProperties = new Properties();
- newProperties.putAll(properties);
- if (keySerializer != null)
- newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
- if (valueSerializer != null)
- newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
- return newProperties;
- }
-
@SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 122375c..fa9daae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,13 +16,16 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Serializer;
/**
* Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
@@ -166,6 +169,7 @@ public class ProducerConfig extends AbstractConfig {
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -217,6 +221,28 @@ public class ProducerConfig extends AbstractConfig {
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
}
+ public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
+ Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+ Map<String, Object> newConfigs = new HashMap<String, Object>();
+ newConfigs.putAll(configs);
+ if (keySerializer != null)
+ newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
+ if (valueSerializer != null)
+ newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
+ return newConfigs;
+ }
+
+ public static Properties addSerializerToConfig(Properties properties,
+ Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+ Properties newProperties = new Properties();
+ newProperties.putAll(properties);
+ if (keySerializer != null)
+ newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
+ if (valueSerializer != null)
+ newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
+ return newProperties;
+ }
+
ProducerConfig(Map<? extends Object, ? extends Object> props) {
super(CONFIG, props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ed9c63a..03df9ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -298,9 +298,8 @@ public class Sender implements Runnable {
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- ByteBuffer recordsBuffer = batch.records.buffer();
- recordsBuffer.flip();
- produceRecordsByPartition.put(tp, recordsBuffer);
+ batch.records.rewind();
+ produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 6baad93..57de058 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.network;
import java.io.EOFException;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
@@ -90,7 +91,7 @@ public class Selector implements Selectable {
/**
* Create a new selector
*/
- public Selector(Metrics metrics, Time time , String metricGrpPrefix , Map<String, String> metricTags) {
+ public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
try {
this.selector = java.nio.channels.Selector.open();
} catch (IOException e) {
@@ -274,7 +275,7 @@ public class Selector implements Selectable {
}
} catch (IOException e) {
String desc = socketDescription(channel);
- if (e instanceof EOFException)
+ if (e instanceof EOFException || e instanceof ConnectException)
log.info("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ad2171f..ce18a6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,14 +41,23 @@ public enum Errors {
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6,
new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
- REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
+ REQUEST_TIMED_OUT(7,
+ new TimeoutException("The request timed out.")),
+ // TODO: errorCode 8 for BrokerNotAvailable
+ REPLICA_NOT_AVAILABLE(9,
+ new ApiException("The replica is not available for the requested topic-partition")),
MESSAGE_TOO_LARGE(10,
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
- OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
- NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
- OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")),
- CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")),
- NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct co-ordinator for this consumer.")),
+ OFFSET_METADATA_TOO_LARGE(12,
+ new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
+ NETWORK_EXCEPTION(13,
+ new NetworkException("The server disconnected before a response was received.")),
+ OFFSET_LOAD_IN_PROGRESS(14,
+ new ApiException("The coordinator is loading offsets and can't process requests.")),
+ CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
+ new ApiException("The coordinator is not available.")),
+ NOT_COORDINATOR_FOR_CONSUMER(16,
+ new ApiException("This is not the correct co-ordinator for this consumer.")),
INVALID_TOPIC_EXCEPTION(17,
new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
RECORD_LIST_TOO_LARGE(18,
@@ -57,7 +66,12 @@ public enum Errors {
new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
- INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks."));
+ INVALID_REQUIRED_ACKS(21,
+ new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
+ ILLEGAL_GENERATION(22,
+ new ApiException("Specified consumer generation id is not valid.")),
+ NO_OFFSETS_FETCHABLE(23,
+ new ApiException("No offsets have been committed so far."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
@@ -68,7 +82,6 @@ public enum Errors {
if (error.exception != null)
classToError.put(error.exception.getClass(), error);
}
-
}
private final short code;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 083e7a3..a412f61 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -120,6 +120,16 @@ public class MemoryRecords implements Records {
buffer = compressor.buffer();
}
+ /**
+ * Rewind the writable records to read mode
+ */
+ public void rewind() {
+ if (writable)
+ throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+
+ buffer.flip();
+ }
+
/** Write the records in this set to the given channel */
public int writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
@@ -158,7 +168,7 @@ public class MemoryRecords implements Records {
@Override
public Iterator<LogEntry> iterator() {
- ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
+ ByteBuffer copy = this.buffer.duplicate();
return new RecordsIterator(copy, CompressionType.NONE, false);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e67c4c8..f020aaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -41,6 +41,17 @@ public class FetchResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * OFFSET_OUT_OF_RANGE (1)
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * REPLICA_NOT_AVAILABLE (9)
+ * UNKNOWN (-1)
+ */
+
private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
private static final String RECORD_SET_KEY_NAME = "record_set";
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 0057496..f548cd0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,6 +24,12 @@ public class HeartbeatResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private final short errorCode;
public HeartbeatResponse(short errorCode) {
super(new Struct(CURRENT_SCHEMA));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 8c50e9b..1ebc188 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -30,6 +30,8 @@ public class JoinGroupRequest extends AbstractRequestResponse {
private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+ public static final String UNKNOWN_CONSUMER_ID = "";
+
private final String groupId;
private final int sessionTimeout;
private final List<String> topics;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 52b1803..fd9c545 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -26,6 +26,13 @@ public class JoinGroupResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index cfac47a..af704f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -41,6 +41,13 @@ public class ListOffsetResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private static final String OFFSETS_KEY_NAME = "offsets";
private final Map<TopicPartition, PartitionData> responseData;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 90f3141..36736ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -40,11 +40,25 @@ public class MetadataResponse extends AbstractRequestResponse {
// topic level field names
private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
// partition level field names
private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private static final String PARTITION_KEY_NAME = "partition_id";
private static final String LEADER_KEY_NAME = "leader";
private static final String REPLICAS_KEY_NAME = "replicas";
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 4d3b9ec..70844d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -38,6 +38,12 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private final Map<TopicPartition, Short> responseData;
public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index edbed58..f10c246 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -41,6 +41,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
private static final String METADATA_KEY_NAME = "metadata";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+ /**
+ * Possible error code:
+ *
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * OFFSET_LOAD_IN_PROGRESS (14)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * NO_OFFSETS_FETCHABLE (23)
+ */
+
private final Map<TopicPartition, PartitionData> responseData;
public static final class PartitionData {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index a00dcdf..4b67f70 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -37,6 +37,13 @@ public class ProduceResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * TODO
+ */
+
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private final Map<TopicPartition, PartitionResponse> responses;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8f1a7a6..5e3fab1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -35,11 +35,23 @@ import org.apache.kafka.common.utils.Time;
*/
public class MockClient implements KafkaClient {
+ private class FutureResponse {
+ public final Struct responseBody;
+ public final boolean disconnected;
+
+ public FutureResponse(Struct responseBody, boolean disconnected) {
+ this.responseBody = responseBody;
+ this.disconnected = disconnected;
+ }
+ }
+
private final Time time;
private int correlation = 0;
+ private Node node = null;
private final Set<Integer> ready = new HashSet<Integer>();
private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
+ private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
public MockClient(Time time) {
this.time = time;
@@ -52,9 +64,8 @@ public class MockClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
- boolean found = isReady(node, now);
ready.add(node.id());
- return found;
+ return true;
}
@Override
@@ -62,6 +73,11 @@ public class MockClient implements KafkaClient {
return 0;
}
+ @Override
+ public boolean connectionFailed(Node node) {
+ return false;
+ }
+
public void disconnect(Integer node) {
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
@@ -76,16 +92,25 @@ public class MockClient implements KafkaClient {
@Override
public void send(ClientRequest request) {
- this.requests.add(request);
+ if (!futureResponses.isEmpty()) {
+ FutureResponse futureResp = futureResponses.poll();
+ ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
+ responses.add(resp);
+ } else {
+ this.requests.add(request);
+ }
}
@Override
public List<ClientResponse> poll(long timeoutMs, long now) {
- for (ClientResponse response: this.responses)
- if (response.request().hasCallback())
+ List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+
+ while (!this.responses.isEmpty()) {
+ ClientResponse response = this.responses.poll();
+ if (response.request().hasCallback())
response.request().callback().onComplete(response);
- List<ClientResponse> copy = new ArrayList<ClientResponse>();
- this.responses.clear();
+ }
+
return copy;
}
@@ -107,8 +132,24 @@ public class MockClient implements KafkaClient {
}
public void respond(Struct body) {
+ respond(body, false);
+ }
+
+ public void respond(Struct body, boolean disconnected) {
ClientRequest request = requests.remove();
- responses.add(new ClientResponse(request, time.milliseconds(), false, body));
+ responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+ }
+
+ public void prepareResponse(Struct body) {
+ prepareResponse(body, false);
+ }
+
+ public void prepareResponse(Struct body, boolean disconnected) {
+ futureResponses.add(new FutureResponse(body, disconnected));
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
}
@Override
@@ -136,7 +177,7 @@ public class MockClient implements KafkaClient {
@Override
public Node leastLoadedNode(long now) {
- return null;
+ return this.node;
}
}