You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/19 02:18:21 UTC
git commit: KAFKA-1252 Implement retries in new producer.
Repository: kafka
Updated Branches:
refs/heads/trunk f550cc76c -> 3f0b67b6a
KAFKA-1252 Implement retries in new producer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f0b67b6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f0b67b6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f0b67b6
Branch: refs/heads/trunk
Commit: 3f0b67b6ac864befccfdd4bb5dee08c0b33c3b43
Parents: f550cc7
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Feb 13 13:48:21 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 18 17:18:13 2014 -0800
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 5 +-
.../kafka/clients/producer/ProducerConfig.java | 25 +-
.../clients/producer/internals/Metadata.java | 32 +--
.../producer/internals/RecordAccumulator.java | 53 ++--
.../clients/producer/internals/RecordBatch.java | 26 +-
.../clients/producer/internals/Sender.java | 255 +++++++++++++++----
.../java/org/apache/kafka/common/Cluster.java | 40 +--
.../common/errors/CorruptRecordException.java | 30 +--
.../common/errors/InvalidMetadataException.java | 39 +++
.../errors/LeaderNotAvailableException.java | 38 ++-
.../kafka/common/errors/NetworkException.java | 30 +--
.../errors/NotLeaderForPartitionException.java | 29 +--
.../common/errors/OffsetMetadataTooLarge.java | 27 +-
.../errors/OffsetOutOfRangeException.java | 30 +--
.../common/errors/RecordTooLargeException.java | 27 +-
.../kafka/common/errors/RetriableException.java | 37 +++
.../kafka/common/errors/RetryableException.java | 47 ----
.../kafka/common/errors/TimeoutException.java | 29 +--
.../common/errors/UnknownServerException.java | 29 ++-
.../UnknownTopicOrPartitionException.java | 29 +--
.../apache/kafka/common/network/Selector.java | 30 +--
.../clients/producer/RecordAccumulatorTest.java | 27 +-
.../kafka/clients/producer/SenderTest.java | 139 ++++++++--
23 files changed, 652 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d180e8..e4bc972 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,7 +96,7 @@ public class KafkaProducer implements Producer {
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
this.totalMemorySize,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
- config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL),
+ config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
metrics,
new SystemTime());
List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
@@ -108,7 +108,10 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
(short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
+ config.getInt(ProducerConfig.MAX_RETRIES_CONFIG),
config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
+ config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+ config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
new SystemTime());
this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
this.ioThread.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dca9802..d8e35e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -62,7 +62,7 @@ public class ProducerConfig extends AbstractConfig {
/**
* The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
* faster than they can be delivered to the server the producer will either block or throw an exception based on the
- * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
+ * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}.
*/
public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
@@ -107,6 +107,11 @@ public class ProducerConfig extends AbstractConfig {
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
/**
+ * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this)
+ */
+ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+
+ /**
* The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
* has its own cap on record size which may be different from this.
*/
@@ -123,9 +128,17 @@ public class ProducerConfig extends AbstractConfig {
* this setting is true and we block, however users who want to guarantee we never block can turn this into an
* error.
*/
- public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
+ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
+
+ /**
+ * The maximum number of times to attempt resending the request before giving up.
+ */
+ public static final String MAX_RETRIES_CONFIG = "request.retries";
- public static final String ENABLE_JMX = "enable.jmx";
+ /**
+ * Should we register the Kafka metrics as JMX mbeans?
+ */
+ public static final String ENABLE_JMX_CONFIG = "enable.jmx";
static {
/* TODO: add docs */
@@ -142,10 +155,12 @@ public class ProducerConfig extends AbstractConfig {
.define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
- .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah")
- .define(ENABLE_JMX, Type.BOOLEAN, true, "");
+ .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
+ .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
+ .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "");
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 52d30a8..62613a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
@@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
-
/**
* A class encapsulating some of the logic around metadata.
* <p>
@@ -134,4 +129,11 @@ public final class Metadata {
notifyAll();
}
+ /**
+ * The last time metadata was updated.
+ */
+ public synchronized long lastUpdate() {
+ return this.lastRefresh;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index be8a4a3..ce5cf27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -1,25 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
@@ -39,10 +34,9 @@ import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-
/**
- * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be
- * sent to the server.
+ * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
+ * instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
@@ -152,6 +146,17 @@ public final class RecordAccumulator {
}
/**
+ * Re-enqueue the given record batch in the accumulator to retry
+ */
+ public void reenqueue(RecordBatch batch, long now) {
+ batch.attempts++;
+ Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
+ synchronized (deque) {
+ deque.addFirst(batch);
+ }
+ }
+
+ /**
* Get a list of topic-partitions which are ready to be sent.
* <p>
* A partition is ready if ANY of the following are true:
@@ -229,16 +234,10 @@ public final class RecordAccumulator {
}
/**
- * Deallocate the list of record batches
+ * Deallocate the record batch
*/
- public void deallocate(Collection<RecordBatch> batches) {
- ByteBuffer[] buffers = new ByteBuffer[batches.size()];
- int i = 0;
- for (RecordBatch batch : batches) {
- buffers[i] = batch.records.buffer();
- i++;
- }
- free.deallocate(buffers);
+ public void deallocate(RecordBatch batch) {
+ free.deallocate(batch.records.buffer());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 7a440a3..eb16f6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
@@ -25,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
-
/**
* A batch of records that is or will be sent.
*
@@ -33,6 +28,7 @@ import org.apache.kafka.common.record.MemoryRecords;
*/
public final class RecordBatch {
public int recordCount = 0;
+ public volatile int attempts = 0;
public final long created;
public final MemoryRecords records;
public final TopicPartition topicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d93a455..e373265 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -22,12 +22,15 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
@@ -41,6 +44,7 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
@@ -48,19 +52,55 @@ import org.apache.kafka.common.utils.Time;
*/
public class Sender implements Runnable {
- private final Map<Integer, NodeState> nodeState;
+ /* the state of each nodes connection */
+ private final NodeStates nodeStates;
+
+ /* the record accumulator that batches records */
private final RecordAccumulator accumulator;
+
+ /* the selector used to perform network i/o */
private final Selectable selector;
+
+ /* the client id used to identify this client in requests to the server */
private final String clientId;
+
+ /* the maximum request size to attempt to send to the server */
private final int maxRequestSize;
- private final long reconnectBackoffMs;
+
+ /* the number of acknowledgements to request from the server */
private final short acks;
+
+ /* the max time in ms for the server to wait for acknowlegements */
private final int requestTimeout;
+
+ /* the number of times to retry a failed request before giving up */
+ private final int retries;
+
+ /* the socket send buffer size in bytes */
+ private final int socketSendBuffer;
+
+ /* the socket receive size buffer in bytes */
+ private final int socketReceiveBuffer;
+
+ /* the set of currently in-flight requests awaiting a response from the server */
private final InFlightRequests inFlightRequests;
+
+ /* a reference to the current Cluster instance */
private final Metadata metadata;
+
+ /* the clock instance used for getting the time */
private final Time time;
+
+ /* the current node to attempt to use for metadata requests (will round-robin over nodes) */
+ private int metadataFetchNodeIndex;
+
+ /* the current correlation id to use when sending requests to servers */
private int correlation;
+
+ /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
private boolean metadataFetchInProgress;
+
+ /* true while the sender thread is still running */
private volatile boolean running;
public Sender(Selectable selector,
@@ -70,22 +110,28 @@ public class Sender implements Runnable {
int maxRequestSize,
long reconnectBackoffMs,
short acks,
+ int retries,
int requestTimeout,
+ int socketSendBuffer,
+ int socketReceiveBuffer,
Time time) {
- this.nodeState = new HashMap<Integer, NodeState>();
+ this.nodeStates = new NodeStates(reconnectBackoffMs);
this.accumulator = accumulator;
this.selector = selector;
this.maxRequestSize = maxRequestSize;
- this.reconnectBackoffMs = reconnectBackoffMs;
this.metadata = metadata;
this.clientId = clientId;
this.running = true;
this.requestTimeout = requestTimeout;
this.acks = acks;
+ this.retries = retries;
+ this.socketSendBuffer = socketSendBuffer;
+ this.socketReceiveBuffer = socketReceiveBuffer;
this.inFlightRequests = new InFlightRequests();
this.correlation = 0;
this.metadataFetchInProgress = false;
this.time = time;
+ this.metadataFetchNodeIndex = new Random().nextInt();
}
/**
@@ -130,11 +176,7 @@ public class Sender implements Runnable {
// should we update our metadata?
List<NetworkSend> sends = new ArrayList<NetworkSend>();
- InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
- if (metadataReq != null) {
- sends.add(metadataReq.request);
- this.inFlightRequests.add(metadataReq);
- }
+ maybeUpdateMetadata(cluster, sends, now);
// prune the list of ready topics to eliminate any that we aren't ready to send yet
List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
@@ -158,43 +200,76 @@ public class Sender implements Runnable {
// handle responses, connections, and disconnections
handleSends(this.selector.completedSends());
handleResponses(this.selector.completedReceives(), now);
- handleDisconnects(this.selector.disconnected());
+ handleDisconnects(this.selector.disconnected(), now);
handleConnects(this.selector.connected());
return ready.size();
}
- private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
+ /**
+ * Add a metadata request to the list of sends if we need to make one
+ */
+ private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long now) {
if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
- return null;
+ return;
- Node node = nextFreeNode(cluster);
+ Node node = selectMetadataDestination(cluster);
if (node == null)
- return null;
+ return;
- NodeState state = nodeState.get(node.id());
- if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+ if (nodeStates.isConnected(node.id())) {
+ this.metadataFetchInProgress = true;
+ InFlightRequest request = metadataRequest(node.id(), metadata.topics());
+ sends.add(request.request);
+ this.inFlightRequests.add(request);
+ } else if (nodeStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one
initiateConnect(node, now);
- return null;
- } else if (state.state == ConnectionState.CONNECTED) {
- this.metadataFetchInProgress = true;
- return metadataRequest(node.id(), metadata.topics());
- } else {
- return null;
}
}
/**
+ * Find a good node to make a metadata request to. This method will first look for a node that has an existing
+ * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding
+ * requests.
* @return A node with no requests currently being sent or null if no such node exists
*/
- private Node nextFreeNode(Cluster cluster) {
- for (int i = 0; i < cluster.nodes().size(); i++) {
- Node node = cluster.nextNode();
- if (this.inFlightRequests.canSendMore(node.id()))
+ private Node selectMetadataDestination(Cluster cluster) {
+ List<Node> nodes = cluster.nodes();
+
+ // first look for a node to which we are connected and have no outstanding requests
+ boolean connectionInProgress = false;
+ for (int i = 0; i < nodes.size(); i++) {
+ Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
+ if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) {
+ this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
+ return node;
+ } else if (nodeStates.isConnecting(node.id())) {
+ connectionInProgress = true;
+ }
+ }
+
+ // if we have a connection that is being established now, just wait for that don't make another
+ if (connectionInProgress)
+ return null;
+
+ // okay, no luck, pick a random unused node
+ for (int i = 0; i < nodes.size(); i++) {
+ Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
+ if (this.inFlightRequests.canSendMore(node.id())) {
+ this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
return node;
+ }
}
- return null;
+
+ return null; // we failed to find a good destination
+ }
+
+ /**
+ * Get the index in the node list of the node to use for the metadata request
+ */
+ private int metadataNodeIndex(int offset, int size) {
+ return Utils.abs(offset + this.metadataFetchNodeIndex) % size;
}
/**
@@ -209,7 +284,7 @@ public class Sender implements Runnable {
/**
* Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
* it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
- * metdata to be able to do so
+ * metadata to be able to do so
*/
private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
@@ -218,15 +293,11 @@ public class Sender implements Runnable {
if (node == null) {
// we don't know about this topic/partition or it has no leader, re-fetch metadata
metadata.forceUpdate();
- } else {
- NodeState state = nodeState.get(node.id());
- // TODO: encapsulate this logic somehow
- if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
- // we don't have a connection to this node right now, make one
- initiateConnect(node, now);
- } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
- sendable.add(tp);
- }
+ } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
+ sendable.add(tp);
+ } else if (nodeStates.canConnect(node.id(), now)) {
+ // we don't have a connection to this node right now, make one
+ initiateConnect(node, now);
}
}
return sendable;
@@ -237,13 +308,11 @@ public class Sender implements Runnable {
*/
private void initiateConnect(Node node, long now) {
try {
- selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO
- // socket
- // buffers
- nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
+ selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
+ this.nodeStates.connecting(node.id(), now);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
- nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now));
+ nodeStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */
metadata.forceUpdate();
}
@@ -252,19 +321,26 @@ public class Sender implements Runnable {
/**
* Handle any closed connections
*/
- private void handleDisconnects(List<Integer> disconnects) {
+ private void handleDisconnects(List<Integer> disconnects, long now) {
+ // clear out the in-flight requests for the disconnected broker
for (int node : disconnects) {
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
if (request.batches != null) {
- for (RecordBatch batch : request.batches.values())
- batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
- this.accumulator.deallocate(request.batches.values());
+ for (RecordBatch batch : request.batches.values()) {
+ if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
+ this.accumulator.reenqueue(batch, now);
+ } else {
+ batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
+ this.accumulator.deallocate(batch);
+ }
+ }
}
- NodeState state = this.nodeState.get(request.request.destination());
- if (state != null)
- state.state = ConnectionState.DISCONNECTED;
+ nodeStates.disconnected(request.request.destination());
}
}
+ // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
+ if (disconnects.size() > 0)
+ this.metadata.forceUpdate();
}
/**
@@ -272,7 +348,7 @@ public class Sender implements Runnable {
*/
private void handleConnects(List<Integer> connects) {
for (Integer id : connects)
- this.nodeState.get(id).state = ConnectionState.CONNECTED;
+ this.nodeStates.connected(id);
}
/**
@@ -286,9 +362,10 @@ public class Sender implements Runnable {
if (!request.expectResponse) {
requests.pollFirst();
if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
- for (RecordBatch batch : request.batches.values())
+ for (RecordBatch batch : request.batches.values()) {
batch.done(-1L, Errors.NONE.exception());
- this.accumulator.deallocate(request.batches.values());
+ this.accumulator.deallocate(batch);
+ }
}
}
}
@@ -306,7 +383,7 @@ public class Sender implements Runnable {
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request.header(), header);
if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
- handleProduceResponse(req, body);
+ handleProduceResponse(req, body, now);
else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
handleMetadataResponse(body, now);
else
@@ -327,7 +404,7 @@ public class Sender implements Runnable {
/**
* Handle a produce response
*/
- private void handleProduceResponse(InFlightRequest request, Struct response) {
+ private void handleProduceResponse(InFlightRequest request, Struct response, long now) {
for (Object topicResponse : (Object[]) response.get("responses")) {
Struct topicRespStruct = (Struct) topicResponse;
String topic = (String) topicRespStruct.get("topic");
@@ -335,12 +412,31 @@ public class Sender implements Runnable {
Struct partRespStruct = (Struct) partResponse;
int partition = (Integer) partRespStruct.get("partition");
short errorCode = (Short) partRespStruct.get("error_code");
+
+ // if we got an error we may need to refresh our metadata
+ Errors error = Errors.forCode(errorCode);
+ if (error.exception() instanceof InvalidMetadataException)
+ metadata.forceUpdate();
+
long offset = (Long) partRespStruct.get("base_offset");
RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
- batch.done(offset, Errors.forCode(errorCode).exception());
+ if (canRetry(batch, error)) {
+ // retry
+ this.accumulator.reenqueue(batch, now);
+ } else {
+ // tell the user the result of their request
+ batch.done(offset, error.exception());
+ this.accumulator.deallocate(batch);
+ }
}
}
- this.accumulator.deallocate(request.batches.values());
+ }
+
+ /**
+ * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
+ */
+ private boolean canRetry(RecordBatch batch, Errors error) {
+ return batch.attempts < this.retries && error.exception() instanceof RetriableException;
}
/**
@@ -459,6 +555,53 @@ public class Sender implements Runnable {
}
}
+ private static class NodeStates {
+ private final long reconnectBackoffMs;
+ private final Map<Integer, NodeState> nodeState;
+
+ public NodeStates(long reconnectBackoffMs) {
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ this.nodeState = new HashMap<Integer, NodeState>();
+ }
+
+ public boolean canConnect(int node, long now) {
+ NodeState state = nodeState.get(node);
+ if (state == null)
+ return true;
+ else
+ return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs;
+ }
+
+ public void connecting(int node, long now) {
+ nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now));
+ }
+
+ public boolean isConnected(int node) {
+ NodeState state = nodeState.get(node);
+ return state != null && state.state == ConnectionState.CONNECTED;
+ }
+
+ public boolean isConnecting(int node) {
+ NodeState state = nodeState.get(node);
+ return state != null && state.state == ConnectionState.CONNECTING;
+ }
+
+ public void connected(int node) {
+ nodeState(node).state = ConnectionState.CONNECTED;
+ }
+
+ public void disconnected(int node) {
+ nodeState(node).state = ConnectionState.DISCONNECTED;
+ }
+
+ private NodeState nodeState(int node) {
+ NodeState state = this.nodeState.get(node);
+ if (state == null)
+ throw new IllegalStateException("No entry found for node " + node);
+ return state;
+ }
+ }
+
/**
* An request that hasn't been fully processed yet
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c17a8f8..5caaaae 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common;
@@ -23,17 +19,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.common.utils.Utils;
-
/**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public final class Cluster {
- private final AtomicInteger counter = new AtomicInteger(0);
private final List<Node> nodes;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
@@ -126,15 +117,4 @@ public final class Cluster {
return this.partitionsByTopic.get(topic);
}
- /**
- * Round-robin over the nodes in this cluster
- */
- public Node nextNode() {
- int size = nodes.size();
- if (size == 0)
- throw new IllegalStateException("No known nodes.");
- int idx = Utils.abs(counter.getAndIncrement()) % size;
- return this.nodes.get(idx);
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
index 673f61d..eaccf27 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
@@ -1,22 +1,22 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class CorruptRecordException extends ApiException {
+/**
+ * This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk
+ * corruption.
+ */
+public class CorruptRecordException extends RetriableException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
new file mode 100644
index 0000000..8841bad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that may indicate the client's metadata is out of date
+ */
+public abstract class InvalidMetadataException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidMetadataException() {
+ super();
+ }
+
+ public InvalidMetadataException(String message) {
+ super(message);
+ }
+
+ public InvalidMetadataException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidMetadataException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
index 0bde6b5..9d7ebd4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
@@ -1,35 +1,27 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class LeaderNotAvailableException extends RetryableException {
+/**
+ * There is no currently available leader for the given partition (either because a leadership election is in progress
+ * or because all replicas are down).
+ */
+public class LeaderNotAvailableException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
- public LeaderNotAvailableException(String message, Throwable cause) {
- super(message, cause);
- }
-
public LeaderNotAvailableException(String message) {
super(message);
}
- public LeaderNotAvailableException(Throwable cause) {
- super(cause);
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
index 3a04159..f0baa98 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
@@ -1,22 +1,22 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class NetworkException extends ApiException {
+/**
+ * A misc. network-related IOException occurred when making a request. This could be because the client's metadata is
+ * out of date and it is making a request to a node that is now dead.
+ */
+public class NetworkException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
index 5adc72c..ad9c77c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
@@ -1,22 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class NotLeaderForPartitionException extends RetryableException {
+/**
+ * This server is not the leader for the given partition
+ */
+public class NotLeaderForPartitionException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
index a3159bb..0be2f50 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -1,21 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
+/**
+ * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server.
+ */
public class OffsetMetadataTooLarge extends ApiException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
index d01698a..fc7c6e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -1,22 +1,22 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class OffsetOutOfRangeException extends ApiException {
+/**
+ * This offset is either larger or smaller than the range of offsets the server has for the given partition.
+ *
+ */
+public class OffsetOutOfRangeException extends RetriableException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
index ce95ca0..737b7f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -1,21 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
+/**
+ * This record is larger than the maximum allowable size
+ */
public class RecordTooLargeException extends ApiException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
new file mode 100644
index 0000000..6c639a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * A retryable exception is a transient exception that if retried may succeed.
+ */
+public abstract class RetriableException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RetriableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RetriableException(String message) {
+ super(message);
+ }
+
+ public RetriableException(Throwable cause) {
+ super(cause);
+ }
+
+ public RetriableException() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
deleted file mode 100644
index c7f2f22..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-/**
- * A retryable exception is an exception that is safe to retry. To be retryable an exception should be
- * <ol>
- * <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
- * <li>Idempotent, the exception is known to not change any state on the server
- * </ol>
- * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
- * retry.
- */
-public abstract class RetryableException extends ApiException {
-
- private static final long serialVersionUID = 1L;
-
- public RetryableException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public RetryableException(String message) {
- super(message);
- }
-
- public RetryableException(Throwable cause) {
- super(cause);
- }
-
- public RetryableException() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
index dffd64d..c7f569c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
@@ -1,22 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class TimeoutException extends ApiException {
+/**
+ * Indicates that a request timed out.
+ */
+public class TimeoutException extends RetriableException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
index a0690fe..963ef08 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
@@ -1,21 +1,22 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
+/**
+ * An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an
+ * unexpected error.
+ *
+ */
public class UnknownServerException extends ApiException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
index 73d1953..ec423bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -1,22 +1,21 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
-public class UnknownTopicOrPartitionException extends ApiException {
+/**
+ * This topic/partition doesn't exist
+ */
+public class UnknownTopicOrPartitionException extends RetriableException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8ed4c73..f1e474c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
@@ -32,7 +28,6 @@ import java.util.Set;
import org.apache.kafka.common.KafkaException;
-
/**
* A selector interface for doing non-blocking multi-connection network I/O.
* <p>
@@ -302,8 +297,11 @@ public class Selector implements Selectable {
private void close(SelectionKey key) throws IOException {
SocketChannel channel = channel(key);
Transmissions trans = transmissions(key);
- if (trans != null)
+ if (trans != null) {
this.disconnected.add(trans.id);
+ trans.clearReceive();
+ trans.clearSend();
+ }
key.attach(null);
key.cancel();
channel.socket().close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 1bbe83c..a3bf07e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
@@ -25,7 +21,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.RecordBatch;
import org.apache.kafka.common.TopicPartition;
@@ -140,8 +135,8 @@ public class RecordAccumulatorTest {
for (RecordBatch batch : batches) {
for (LogEntry entry : batch.records)
read++;
+ accum.deallocate(batch);
}
- accum.deallocate(batches);
}
for (Thread t : threads)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 41c028b..19a0125 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -1,29 +1,25 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
@@ -46,6 +42,16 @@ import org.junit.Test;
public class SenderTest {
+ private static final String CLIENT_ID = "";
+ private static final int MAX_REQUEST_SIZE = 1024 * 1024;
+ private static final long RECONNECT_BACKOFF_MS = 0L;
+ private static final short ACKS_ALL = -1;
+ private static final int MAX_RETRIES = 0;
+ private static final int REQUEST_TIMEOUT_MS = 10000;
+ private static final int SEND_BUFFER_SIZE = 64 * 1024;
+ private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
+
+ private TopicPartition tp = new TopicPartition("test", 0);
private MockTime time = new MockTime();
private MockSelector selector = new MockSelector(time);
private int batchSize = 16 * 1024;
@@ -53,7 +59,18 @@ public class SenderTest {
private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = new Metrics(time);
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
- private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
+ private Sender sender = new Sender(selector,
+ metadata,
+ this.accumulator,
+ CLIENT_ID,
+ MAX_REQUEST_SIZE,
+ RECONNECT_BACKOFF_MS,
+ ACKS_ALL,
+ MAX_RETRIES,
+ REQUEST_TIMEOUT_MS,
+ SEND_BUFFER_SIZE,
+ RECEIVE_BUFFER_SIZE,
+ time);
@Before
public void setup() {
@@ -62,7 +79,6 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
- TopicPartition tp = new TopicPartition("test", 0);
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
sender.run(time.milliseconds());
assertEquals("We should have connected", 1, selector.connected().size());
@@ -83,6 +99,93 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
}
+ @Test
+ public void testRetries() throws Exception {
+ // create a sender with retries = 1
+ int maxRetries = 1;
+ Sender sender = new Sender(selector,
+ metadata,
+ this.accumulator,
+ CLIENT_ID,
+ MAX_REQUEST_SIZE,
+ RECONNECT_BACKOFF_MS,
+ ACKS_ALL,
+ maxRetries,
+ REQUEST_TIMEOUT_MS,
+ SEND_BUFFER_SIZE,
+ RECEIVE_BUFFER_SIZE,
+ time);
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ RequestSend request1 = completeSend(sender);
+ selector.clear();
+ selector.completeReceive(produceResponse(request1.header().correlationId(),
+ cluster.leaderFor(tp).id(),
+ tp.topic(),
+ tp.partition(),
+ -1,
+ Errors.REQUEST_TIMED_OUT.code()));
+ sender.run(time.milliseconds());
+ selector.clear();
+ sender.run(time.milliseconds());
+ RequestSend request2 = completeSend(sender);
+ selector.completeReceive(produceResponse(request2.header().correlationId(),
+ cluster.leaderFor(tp).id(),
+ tp.topic(),
+ tp.partition(),
+ 42,
+ Errors.NONE.code()));
+ sender.run(time.milliseconds());
+ assertTrue("Request should retry and complete", future.isDone());
+ assertEquals(42, future.get().offset());
+ }
+
+ @Test
+ public void testMetadataRefreshOnNoLeaderException() throws Exception {
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ RequestSend request = completeSend();
+ selector.clear();
+ selector.completeReceive(produceResponse(request.header().correlationId(),
+ cluster.leaderFor(tp).id(),
+ tp.topic(),
+ tp.partition(),
+ -1,
+ Errors.NOT_LEADER_FOR_PARTITION.code()));
+ sender.run(time.milliseconds());
+ completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION);
+ assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
+ }
+
+ @Test
+ public void testMetadataRefreshOnDisconnect() throws Exception {
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ completeSend();
+ selector.clear();
+ selector.disconnect(cluster.leaderFor(tp).id());
+ sender.run(time.milliseconds());
+ completedWithError(future, Errors.NETWORK_EXCEPTION);
+ assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
+ }
+
+ private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
+ assertTrue("Request should be completed", future.isDone());
+ try {
+ future.get();
+ fail("Should have thrown an exception.");
+ } catch (ExecutionException e) {
+ assertEquals(error.exception().getClass(), e.getCause().getClass());
+ }
+ }
+
+ private RequestSend completeSend() {
+ return completeSend(sender);
+ }
+
+ private RequestSend completeSend(Sender sender) {
+ while (selector.completedSends().size() == 0)
+ sender.run(time.milliseconds());
+ return (RequestSend) selector.completedSends().get(0);
+ }
+
private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
Struct response = struct.instance("responses");