You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/27 19:50:36 UTC
git commit: KAFKA-1260 Integration Test for New Producer Part II:
Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 57be6c81a -> 5e2a9a560
KAFKA-1260 Integration Test for New Producer Part II: Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e2a9a56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e2a9a56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e2a9a56
Branch: refs/heads/trunk
Commit: 5e2a9a560d847bd0cf364d86bd6784f70d99c71a
Parents: 57be6c8
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 27 10:50:15 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 27 10:50:26 2014 -0800
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 8 +-
.../kafka/clients/producer/MockProducer.java | 2 +-
.../kafka/clients/producer/RecordMetadata.java | 8 +-
.../internals/FutureRecordMetadata.java | 2 +-
.../clients/producer/internals/Metadata.java | 4 +-
.../clients/producer/internals/RecordBatch.java | 13 +-
.../clients/producer/internals/Sender.java | 2 +-
.../apache/kafka/common/network/Selector.java | 1 +
.../apache/kafka/common/protocol/Errors.java | 14 +-
.../main/scala/kafka/api/ProducerResponse.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 85 +++---
.../kafka/api/ProducerFailureHandlingTest.scala | 284 +++++++++++++++++++
.../kafka/api/ProducerSendTest.scala | 59 ++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 23 +-
14 files changed, 392 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e4bc972..757f7a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -217,10 +218,14 @@ public class KafkaProducer implements Producer {
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup();
return future;
- } catch (Exception e) {
+ // For API exceptions return them in the future;
+ // for other exceptions throw directly
+ } catch (ApiException e) {
if (callback != null)
callback.onCompletion(null, e);
return new FutureFailure(e);
+ } catch (InterruptedException e) {
+ throw new KafkaException(e);
}
}
@@ -255,7 +260,6 @@ public class KafkaProducer implements Producer {
*/
@Override
public void close() {
- this.accumulator.close();
this.sender.initiateClose();
try {
this.ioThread.join();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index f43da80..6a0f3b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -108,7 +108,7 @@ public class MockProducer implements Producer {
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
- Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
+ Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 8c77698..8015f0d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -26,12 +26,18 @@ public final class RecordMetadata {
private final long offset;
private final TopicPartition topicPartition;
- public RecordMetadata(TopicPartition topicPartition, long offset) {
+ private RecordMetadata(TopicPartition topicPartition, long offset) {
super();
this.offset = offset;
this.topicPartition = topicPartition;
}
+ public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
+ // ignore the relativeOffset if the base offset is -1,
+ // since this indicates the offset is unknown
+ this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
+ }
+
/**
* The offset of the record in the topic/partition.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 22d4c79..aec31c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -60,7 +60,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
- return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
}
public long relativeOffset() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 62613a3..ce23168 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -74,10 +74,10 @@ public final class Metadata {
*/
public synchronized Cluster fetch(String topic, long maxWaitMs) {
List<PartitionInfo> partitions = null;
+ long begin = System.currentTimeMillis();
do {
partitions = cluster.partitionsFor(topic);
if (partitions == null) {
- long begin = System.currentTimeMillis();
topics.add(topic);
forceUpdate = true;
try {
@@ -85,7 +85,7 @@ public final class Metadata {
} catch (InterruptedException e) { /* this is fine, just try again */
}
long ellapsed = System.currentTimeMillis() - begin;
- if (ellapsed > maxWaitMs)
+ if (ellapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
} else {
return cluster;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index eb16f6d..ef8e658 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -55,7 +55,7 @@ public final class RecordBatch {
this.records.append(0L, key, value, compression);
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
- thunks.add(new Thunk(callback, this.recordCount));
+ thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
@@ -74,8 +74,7 @@ public final class RecordBatch {
try {
Thunk thunk = this.thunks.get(i);
if (exception == null)
- thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
- null);
+ thunk.callback.onCompletion(thunk.future.get(), null);
else
thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
@@ -85,15 +84,15 @@ public final class RecordBatch {
}
/**
- * A callback and the associated RecordSend argument to pass to it.
+ * A callback and the associated FutureRecordMetadata argument to pass to it.
*/
final private static class Thunk {
final Callback callback;
- final long relativeOffset;
+ final FutureRecordMetadata future;
- public Thunk(Callback callback, long relativeOffset) {
+ public Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
- this.relativeOffset = relativeOffset;
+ this.future = future;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e373265..541c5e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -324,6 +324,7 @@ public class Sender implements Runnable {
private void handleDisconnects(List<Integer> disconnects, long now) {
// clear out the in-flight requests for the disconnected broker
for (int node : disconnects) {
+ nodeStates.disconnected(node);
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
if (request.batches != null) {
for (RecordBatch batch : request.batches.values()) {
@@ -335,7 +336,6 @@ public class Sender implements Runnable {
}
}
}
- nodeStates.disconnected(request.request.destination());
}
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f1e474c..678bfcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -299,6 +299,7 @@ public class Selector implements Selectable {
Transmissions trans = transmissions(key);
if (trans != null) {
this.disconnected.add(trans.id);
+ this.keys.remove(trans.id);
trans.clearReceive();
trans.clearSend();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f88992a..3374bd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
- OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2,
- new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+ CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- LEADER_NOT_AVAILABLE(5,
- new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+ // TODO: errorCode 4 for InvalidFetchSize
+ LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
- MESSAGE_TOO_LARGE(10,
- new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ // TODO: errorCode 8, 9, 11
+ MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 06261b9..5a1d801 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -41,7 +41,7 @@ object ProducerResponse {
}
}
-case class ProducerResponseStatus(error: Short, offset: Long)
+case class ProducerResponseStatus(var error: Short, offset: Long)
case class ProducerResponse(override val correlationId: Int,
status: Map[TopicAndPartition, ProducerResponseStatus])
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae2df20..215ac36 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,7 +231,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.map(
topicAndPartition => new RequestKey(topicAndPartition)).toSeq
- val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap
+ val statuses = localProduceResults.map(r =>
+ r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedProduce = new DelayedProduce(producerRequestKeys,
request,
statuses,
@@ -255,7 +256,16 @@ class KafkaApis(val requestChannel: RequestChannel,
produceRequest.emptyData()
}
}
-
+
+ case class DelayedProduceResponseStatus(requiredOffset: Long,
+ status: ProducerResponseStatus) {
+ var acksPending = false
+
+ override def toString =
+ "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
+ acksPending, status.error, status.offset, requiredOffset)
+ }
+
case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
def this(key: TopicAndPartition, throwable: Throwable) =
this(key, -1L, -1L, Some(throwable))
@@ -762,41 +772,31 @@ class KafkaApis(val requestChannel: RequestChannel,
class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request,
- initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus],
+ val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
val produce: ProducerRequest,
delayMs: Long)
extends DelayedRequest(keys, request, delayMs) with Logging {
- /**
- * Map of (topic, partition) -> partition status
- * The values in this map don't need to be synchronized since updates to the
- * values are effectively synchronized by the ProducerRequestPurgatory's
- * update method
- */
- private [kafka] val partitionStatus = keys.map(requestKey => {
- val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
- // if there was an error in writing to the local replica's log, then don't
- // wait for acks on this partition
- val (acksPending, error, nextOffset) =
- if (producerResponseStatus.error == ErrorMapping.NoError) {
- // Timeout error state will be cleared when requiredAcks are received
- (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset)
- }
- else (false, producerResponseStatus.error, producerResponseStatus.offset)
+ // first update the acks pending variable according to error code
+ partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
+ if (delayedStatus.status.error == ErrorMapping.NoError) {
+ // Timeout error state will be cleared when requiredAcks are received
+ delayedStatus.acksPending = true
+ delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
+ } else {
+ delayedStatus.acksPending = false
+ }
+
+ trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
+ }
- val initialStatus = PartitionStatus(acksPending, error, nextOffset)
- trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
- (requestKey, initialStatus)
- }).toMap
def respond() {
- val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
- status => {
- val pstat = partitionStatus(new RequestKey(status._1))
- (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
- })
-
- val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
+ val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
+ topicAndPartition -> delayedStatus.status
+ }
+
+ val response = ProducerResponse(produce.correlationId, responseStatus)
requestChannel.sendResponse(new RequestChannel.Response(
request, new BoundedByteBufferSend(response)))
@@ -816,8 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def isSatisfied(followerFetchRequestKey: RequestKey) = {
val topic = followerFetchRequestKey.topic
val partitionId = followerFetchRequestKey.partition
- val key = RequestKey(topic, partitionId)
- val fetchPartitionStatus = partitionStatus(key)
+ val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
.format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
@@ -830,10 +829,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (errorCode != ErrorMapping.NoError) {
fetchPartitionStatus.acksPending = false
- fetchPartitionStatus.error = errorCode
+ fetchPartitionStatus.status.error = errorCode
} else if (hasEnough) {
fetchPartitionStatus.acksPending = false
- fetchPartitionStatus.error = ErrorMapping.NoError
+ fetchPartitionStatus.status.error = ErrorMapping.NoError
}
if (!fetchPartitionStatus.acksPending) {
val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
@@ -846,20 +845,6 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
satisfied
}
-
- case class PartitionStatus(var acksPending: Boolean,
- var error: Short,
- requiredOffset: Long) {
- def setThisBrokerNotLeader() {
- error = ErrorMapping.NotLeaderForPartitionCode
- acksPending = false
- }
-
- override def toString =
- "acksPending:%b, error: %d, requiredOffset: %d".format(
- acksPending, error, requiredOffset
- )
- }
}
/**
@@ -877,8 +862,8 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an expired delayed request
*/
protected def expire(delayedProduce: DelayedProduce) {
- for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending)
- delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1)
+ for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
+ delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
delayedProduce.respond()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
new file mode 100644
index 0000000..b8eb726
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api.test
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
+import org.junit.Assert._
+
+import java.util.Properties
+import java.lang.Integer
+import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
+
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.clients.producer._
+
+class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val ports = TestUtils.choosePorts(2)
+ private val (port1, port2) = (ports(0), ports(1))
+ private var server1: KafkaServer = null
+ private var server2: KafkaServer = null
+ private var servers = List.empty[KafkaServer]
+
+ private var consumer1: SimpleConsumer = null
+ private var consumer2: SimpleConsumer = null
+
+ private var producer1: KafkaProducer = null
+ private var producer2: KafkaProducer = null
+ private var producer3: KafkaProducer = null
+ private var producer4: KafkaProducer = null
+
+ private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ props1.put("auto.create.topics.enable", "false")
+ props2.put("auto.create.topics.enable", "false")
+ private val config1 = new KafkaConfig(props1)
+ private val config2 = new KafkaConfig(props2)
+ private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
+
+ private val bufferSize = 2 * config1.messageMaxBytes
+
+ private val topic1 = "topic-1"
+ private val topic2 = "topic-2"
+
+ // TODO: move this function to TestUtils after we have server dependant on clients
+ private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long,
+ blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = {
+ val producerProps = new Properties()
+ producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+ producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString)
+ producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+ producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
+ producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
+ return new KafkaProducer(producerProps)
+ }
+
+ override def setUp() {
+ super.setUp()
+ server1 = TestUtils.createServer(config1)
+ server2 = TestUtils.createServer(config2)
+ servers = List(server1,server2)
+
+ // TODO: we need to migrate to new consumers when 0.9 is final
+ consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+
+ producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0
+ producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1
+ producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1
+ producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize); // produce with incorrect broker list
+ }
+
+ override def tearDown() {
+ server1.shutdown; Utils.rm(server1.config.logDirs)
+ server2.shutdown; Utils.rm(server2.config.logDirs)
+
+ consumer1.close
+ consumer2.close
+
+ if (producer1 != null) producer1.close
+ if (producer2 != null) producer2.close
+ if (producer3 != null) producer3.close
+ if (producer4 != null) producer4.close
+
+ super.tearDown()
+ }
+
+ /**
+ * With ack == 0 the future metadata will have no exceptions with offset -1
+ */
+ @Test
+ def testTooLargeRecordWithAckZero() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // send a too-large record
+ val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+ assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
+ }
+
+ /**
+ * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
+ */
+ @Test
+ def testTooLargeRecordWithAckOne() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // send a too-large record
+ val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+ intercept[ExecutionException] {
+ producer2.send(record).get
+ }
+ }
+
+ /**
+ * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
+ */
+ @Test
+ def testNonExistTopic() {
+ // send a record with non-exist topic
+ val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
+ intercept[ExecutionException] {
+ producer1.send(record).get
+ }
+ }
+
+ /**
+ * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
+ *
+ * TODO: other exceptions that can be thrown in ExecutionException:
+ * UnknownTopicOrPartitionException
+ * NotLeaderForPartitionException
+ * LeaderNotAvailableException
+ * CorruptRecordException
+ * TimeoutException
+ */
+ @Test
+ def testWrongBrokerList() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // send a record with incorrect broker list
+ val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+ intercept[ExecutionException] {
+ producer4.send(record).get
+ }
+ }
+
+ /**
+ * 1. With ack=0, the future metadata should not be blocked.
+ * 2. With ack=1, the future metadata should block,
+ * and subsequent calls will eventually cause buffer full
+ */
+ @Test
+ def testNoResponse() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // first send a message to make sure the metadata is refreshed
+ val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+ producer1.send(record).get
+ producer2.send(record).get
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server1.requestHandlerPool.shutdown()
+ server2.requestHandlerPool.shutdown()
+
+ producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
+
+ intercept[TimeoutException] {
+ producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
+ }
+
+ // TODO: expose producer configs after creating them
+ // send enough messages to get buffer full
+ val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
+
+ intercept[KafkaException] {
+ for (i <- 1 to tooManyRecords)
+ producer2.send(record)
+ }
+
+ // do not close produce2 since it will block
+ // TODO: can we do better?
+ producer2 = null
+ }
+
+ /**
+ * The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException
+ */
+ @Test
+ def testInvalidPartition() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // create a record with incorrect partition id, send should fail
+ val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
+ intercept[KafkaException] {
+ producer1.send(record)
+ }
+ intercept[KafkaException] {
+ producer2.send(record)
+ }
+ intercept[KafkaException] {
+ producer3.send(record)
+ }
+ }
+
+ /**
+ * The send call after producer closed should throw KafkaException cased by IllegalStateException
+ */
+ @Test
+ def testSendAfterClosed() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+
+ // first send a message to make sure the metadata is refreshed
+ producer1.send(record).get
+ producer2.send(record).get
+ producer3.send(record).get
+
+ intercept[KafkaException] {
+ producer1.close
+ producer1.send(record)
+ }
+ intercept[KafkaException] {
+ producer2.close
+ producer2.send(record)
+ }
+ intercept[KafkaException] {
+ producer3.close
+ producer3.send(record)
+ }
+
+ // re-close producer is fine
+ }
+
+ /**
+ * With replication, producer should able able to find new leader after it detects broker failure
+ */
+ @Test
+ def testBrokerFailure() {
+ // create topic
+ val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+ val leader = leaders(0)
+ assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
+
+ val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+ assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L)
+
+ // shutdown broker
+ val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
+ serverToShutdown.shutdown()
+ serverToShutdown.awaitShutdown()
+
+ // send the message again, it should still succeed due-to retry
+ assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 34baa8c..66ea76b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package kafka.test
+package kafka.api.test
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZkUtils, Utils, TestUtils, Logging}
+import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder
import kafka.message.Message
@@ -33,7 +32,6 @@ import org.junit.Assert._
import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
-import org.apache.log4j.Logger
class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
// send a normal record
val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
- val response0 = producer.send(record0, callback)
- assertEquals("Should have offset 0", 0L, response0.get.offset)
+ assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
- val response1 = producer.send(record1, callback)
- assertEquals("Should have offset 1", 1L, response1.get.offset)
+ assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
- val response2 = producer.send(record2, callback)
- assertEquals("Should have offset 2", 2L, response2.get.offset)
+ assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
- val response3 = producer.send(record3, callback)
- assertEquals("Should have offset 3", 3L, response3.get.offset)
+ assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
- val response4 = producer.send(record4, callback)
- response4.wait
+ producer.send(record4, callback)
+ fail("Should not allow sending a record without topic")
} catch {
case iae: IllegalArgumentException => // this is ok
case e: Throwable => fail("Only expecting IllegalArgumentException", e)
@@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.send(record0)
// check that all messages have been acked via offset
- val response5 = producer.send(record0, callback)
- assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset)
+ assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
} finally {
if (producer != null) {
@@ -157,7 +150,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
/**
* testClose checks the closing behavior
*
- * 1. After close() returns, all messages should be sent with correct returned offset metadata
+ * After close() returns, all messages should be sent with correct returned offset metadata
*/
@Test
def testClose() {
@@ -195,7 +188,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
/**
* testSendToPartition checks the partitioning behavior
*
- * 1. The specified partition-id should be respected
+ * The specified partition-id should be respected
*/
@Test
def testSendToPartition() {
@@ -207,40 +200,40 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
// create topic
val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ val partition = 1
// make sure leaders exist
- val leader1 = leaders.get(1)
+ val leader1 = leaders(partition)
assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
- val partition = 1
val responses =
- for (i <- 0 until numRecords)
+ for (i <- 1 to numRecords)
yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
val futures = responses.toList
- futures.map(_.wait)
+ futures.map(_.get)
for (future <- futures)
assertTrue("Request should have completed", future.isDone)
// make sure all of them end up in the same partition with increasing offset values
for ((future, offset) <- futures zip (0 until numRecords)) {
- assertEquals(offset, future.get.offset)
+ assertEquals(offset.toLong, future.get.offset)
assertEquals(topic, future.get.topic)
- assertEquals(1, future.get.partition)
+ assertEquals(partition, future.get.partition)
}
// make sure the fetched messages also respect the partitioning and ordering
val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
- consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
- }else {
- consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
}
- val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer
+ val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
// TODO: also check topic and partition after they are added in the return messageSet
for (i <- 0 to numRecords - 1) {
assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
- assertEquals(i, messageSet1(i).offset)
+ assertEquals(i.toLong, messageSet1(i).offset)
}
} finally {
if (producer != null) {
@@ -250,6 +243,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
}
}
+ /**
+ * testAutoCreateTopic
+ *
+ * The topic should be created upon sending the first message
+ */
@Test
def testAutoCreateTopic() {
val props = new Properties()
@@ -259,8 +257,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
- val response = producer.send(record)
- assertEquals("Should have offset 0", 0L, response.get.offset)
+ assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected
assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1c7a450..772d214 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,24 +23,27 @@ import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.Properties
-import junit.framework.AssertionFailedError
-import junit.framework.Assert._
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
+
+import collection.mutable.Map
+import collection.mutable.ListBuffer
+
+import org.I0Itec.zkclient.ZkClient
+
import kafka.server._
import kafka.producer._
import kafka.message._
-import org.I0Itec.zkclient.ZkClient
+import kafka.api._
import kafka.cluster.Broker
-import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.TimeUnit
-import kafka.api._
-import collection.mutable.Map
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
-import junit.framework.Assert
import kafka.admin.AdminUtils
+import kafka.producer.ProducerConfig
+import junit.framework.AssertionFailedError
+import junit.framework.Assert._
/**
* Utility functions to help with testing
@@ -526,7 +529,7 @@ object TestUtils extends Logging {
}
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = {
- Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
+ assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout))
}