You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/27 19:50:36 UTC

git commit: KAFKA-1260 Integration Test for New Producer Part II: Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 57be6c81a -> 5e2a9a560


KAFKA-1260 Integration Test for New Producer Part II: Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e2a9a56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e2a9a56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e2a9a56

Branch: refs/heads/trunk
Commit: 5e2a9a560d847bd0cf364d86bd6784f70d99c71a
Parents: 57be6c8
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 27 10:50:15 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 27 10:50:26 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   8 +-
 .../kafka/clients/producer/MockProducer.java    |   2 +-
 .../kafka/clients/producer/RecordMetadata.java  |   8 +-
 .../internals/FutureRecordMetadata.java         |   2 +-
 .../clients/producer/internals/Metadata.java    |   4 +-
 .../clients/producer/internals/RecordBatch.java |  13 +-
 .../clients/producer/internals/Sender.java      |   2 +-
 .../apache/kafka/common/network/Selector.java   |   1 +
 .../apache/kafka/common/protocol/Errors.java    |  14 +-
 .../main/scala/kafka/api/ProducerResponse.scala |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  85 +++---
 .../kafka/api/ProducerFailureHandlingTest.scala | 284 +++++++++++++++++++
 .../kafka/api/ProducerSendTest.scala            |  59 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala |  23 +-
 14 files changed, 392 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e4bc972..757f7a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -217,10 +218,14 @@ public class KafkaProducer implements Producer {
             FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
             this.sender.wakeup();
             return future;
-        } catch (Exception e) {
+            // For API exceptions return them in the future;
+            // for other exceptions throw directly
+        } catch (ApiException e) {
             if (callback != null)
                 callback.onCompletion(null, e);
             return new FutureFailure(e);
+        } catch (InterruptedException e) {
+            throw new KafkaException(e);
         }
     }
 
@@ -255,7 +260,6 @@ public class KafkaProducer implements Producer {
      */
     @Override
     public void close() {
-        this.accumulator.close();
         this.sender.initiateClose();
         try {
             this.ioThread.join();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index f43da80..6a0f3b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -108,7 +108,7 @@ public class MockProducer implements Producer {
         FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
+        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback);
         this.sent.add(record);
         if (autoComplete)
             completion.complete(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 8c77698..8015f0d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -26,12 +26,18 @@ public final class RecordMetadata {
     private final long offset;
     private final TopicPartition topicPartition;
 
-    public RecordMetadata(TopicPartition topicPartition, long offset) {
+    private RecordMetadata(TopicPartition topicPartition, long offset) {
         super();
         this.offset = offset;
         this.topicPartition = topicPartition;
     }
 
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
+        // ignore the relativeOffset if the base offset is -1,
+        // since this indicates the offset is unknown
+        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
+    }
+
     /**
      * The offset of the record in the topic/partition.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 22d4c79..aec31c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -60,7 +60,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
         if (this.result.error() != null)
             throw new ExecutionException(this.result.error());
         else
-            return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+            return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
     }
 
     public long relativeOffset() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 62613a3..ce23168 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -74,10 +74,10 @@ public final class Metadata {
      */
     public synchronized Cluster fetch(String topic, long maxWaitMs) {
         List<PartitionInfo> partitions = null;
+        long begin = System.currentTimeMillis();
         do {
             partitions = cluster.partitionsFor(topic);
             if (partitions == null) {
-                long begin = System.currentTimeMillis();
                 topics.add(topic);
                 forceUpdate = true;
                 try {
@@ -85,7 +85,7 @@ public final class Metadata {
                 } catch (InterruptedException e) { /* this is fine, just try again */
                 }
                 long ellapsed = System.currentTimeMillis() - begin;
-                if (ellapsed > maxWaitMs)
+                if (ellapsed >= maxWaitMs)
                     throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
             } else {
                 return cluster;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index eb16f6d..ef8e658 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -55,7 +55,7 @@ public final class RecordBatch {
             this.records.append(0L, key, value, compression);
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
-                thunks.add(new Thunk(callback, this.recordCount));
+                thunks.add(new Thunk(callback, future));
             this.recordCount++;
             return future;
         }
@@ -74,8 +74,7 @@ public final class RecordBatch {
             try {
                 Thunk thunk = this.thunks.get(i);
                 if (exception == null)
-                    thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
-                                                null);
+                    thunk.callback.onCompletion(thunk.future.get(), null);
                 else
                     thunk.callback.onCompletion(null, exception);
             } catch (Exception e) {
@@ -85,15 +84,15 @@ public final class RecordBatch {
     }
 
     /**
-     * A callback and the associated RecordSend argument to pass to it.
+     * A callback and the associated FutureRecordMetadata argument to pass to it.
      */
     final private static class Thunk {
         final Callback callback;
-        final long relativeOffset;
+        final FutureRecordMetadata future;
 
-        public Thunk(Callback callback, long relativeOffset) {
+        public Thunk(Callback callback, FutureRecordMetadata future) {
             this.callback = callback;
-            this.relativeOffset = relativeOffset;
+            this.future = future;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e373265..541c5e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -324,6 +324,7 @@ public class Sender implements Runnable {
     private void handleDisconnects(List<Integer> disconnects, long now) {
         // clear out the in-flight requests for the disconnected broker
         for (int node : disconnects) {
+            nodeStates.disconnected(node);
             for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
                 if (request.batches != null) {
                     for (RecordBatch batch : request.batches.values()) {
@@ -335,7 +336,6 @@ public class Sender implements Runnable {
                         }
                     }
                 }
-                nodeStates.disconnected(request.request.destination());
             }
         }
         // we got a disconnect so we should probably refresh our metadata and see if that broker is dead

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f1e474c..678bfcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -299,6 +299,7 @@ public class Selector implements Selectable {
         Transmissions trans = transmissions(key);
         if (trans != null) {
             this.disconnected.add(trans.id);
+            this.keys.remove(trans.id);
             trans.clearReceive();
             trans.clearSend();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f88992a..3374bd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 public enum Errors {
     UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
     NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-                        new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-                    new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+    OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+    CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
     UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    LEADER_NOT_AVAILABLE(5,
-                         new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+    // TODO: errorCode 4 for InvalidFetchSize
+    LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
     NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
     REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
-    MESSAGE_TOO_LARGE(10,
-                      new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+    // TODO: errorCode 8, 9, 11
+    MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 06261b9..5a1d801 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -41,7 +41,7 @@ object ProducerResponse {
   }
 }
 
-case class ProducerResponseStatus(error: Short, offset: Long)
+case class ProducerResponseStatus(var error: Short, offset: Long)
 
 case class ProducerResponse(override val correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus])

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae2df20..215ac36 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,7 +231,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val producerRequestKeys = produceRequest.data.keys.map(
         topicAndPartition => new RequestKey(topicAndPartition)).toSeq
-      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap
+      val statuses = localProduceResults.map(r =>
+        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
       val delayedProduce = new DelayedProduce(producerRequestKeys, 
                                               request,
                                               statuses,
@@ -255,7 +256,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       produceRequest.emptyData()
     }
   }
-  
+
+  case class DelayedProduceResponseStatus(requiredOffset: Long,
+                                          status: ProducerResponseStatus) {
+    var acksPending = false
+
+    override def toString =
+      "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
+        acksPending, status.error, status.offset, requiredOffset)
+  }
+
   case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
     def this(key: TopicAndPartition, throwable: Throwable) = 
       this(key, -1L, -1L, Some(throwable))
@@ -762,41 +772,31 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
-                       initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus],
+                       val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
                        val produce: ProducerRequest,
                        delayMs: Long)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
-    /**
-     * Map of (topic, partition) -> partition status
-     * The values in this map don't need to be synchronized since updates to the
-     * values are effectively synchronized by the ProducerRequestPurgatory's
-     * update method
-     */
-    private [kafka] val partitionStatus = keys.map(requestKey => {
-      val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
-      // if there was an error in writing to the local replica's log, then don't
-      // wait for acks on this partition
-      val (acksPending, error, nextOffset) =
-        if (producerResponseStatus.error == ErrorMapping.NoError) {
-          // Timeout error state will be cleared when requiredAcks are received
-          (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset)
-        }
-        else (false, producerResponseStatus.error, producerResponseStatus.offset)
+    // first update the acks pending variable according to error code
+    partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
+      if (delayedStatus.status.error == ErrorMapping.NoError) {
+        // Timeout error state will be cleared when requiredAcks are received
+        delayedStatus.acksPending = true
+        delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
+      } else {
+        delayedStatus.acksPending = false
+      }
+
+      trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
+    }
 
-      val initialStatus = PartitionStatus(acksPending, error, nextOffset)
-      trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
-      (requestKey, initialStatus)
-    }).toMap
 
     def respond() {
-      val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
-        status => {
-          val pstat = partitionStatus(new RequestKey(status._1))
-          (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
-        })
-      
-      val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
+      val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
+        topicAndPartition -> delayedStatus.status
+      }
+
+      val response = ProducerResponse(produce.correlationId, responseStatus)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -816,8 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def isSatisfied(followerFetchRequestKey: RequestKey) = {
       val topic = followerFetchRequestKey.topic
       val partitionId = followerFetchRequestKey.partition
-      val key = RequestKey(topic, partitionId)
-      val fetchPartitionStatus = partitionStatus(key)
+      val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
       trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
         .format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
@@ -830,10 +829,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
         if (errorCode != ErrorMapping.NoError) {
           fetchPartitionStatus.acksPending = false
-          fetchPartitionStatus.error = errorCode
+          fetchPartitionStatus.status.error = errorCode
         } else if (hasEnough) {
           fetchPartitionStatus.acksPending = false
-          fetchPartitionStatus.error = ErrorMapping.NoError
+          fetchPartitionStatus.status.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
           val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
@@ -846,20 +845,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
       satisfied
     }
-
-    case class PartitionStatus(var acksPending: Boolean,
-                          var error: Short,
-                          requiredOffset: Long) {
-      def setThisBrokerNotLeader() {
-        error = ErrorMapping.NotLeaderForPartitionCode
-        acksPending = false
-      }
-
-      override def toString =
-        "acksPending:%b, error: %d, requiredOffset: %d".format(
-          acksPending, error, requiredOffset
-        )
-    }
   }
 
   /**
@@ -877,8 +862,8 @@ class KafkaApis(val requestChannel: RequestChannel,
      * Handle an expired delayed request
      */
     protected def expire(delayedProduce: DelayedProduce) {
-      for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending)
-        delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1)
+      for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
+        delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
 
       delayedProduce.respond()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
new file mode 100644
index 0000000..b8eb726
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api.test
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
+import org.junit.Assert._
+
+import java.util.Properties
+import java.lang.Integer
+import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
+
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.clients.producer._
+
+class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
+  private val brokerId1 = 0
+  private val brokerId2 = 1
+  private val ports = TestUtils.choosePorts(2)
+  private val (port1, port2) = (ports(0), ports(1))
+  private var server1: KafkaServer = null
+  private var server2: KafkaServer = null
+  private var servers = List.empty[KafkaServer]
+
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+
+  private var producer1: KafkaProducer = null
+  private var producer2: KafkaProducer = null
+  private var producer3: KafkaProducer = null
+  private var producer4: KafkaProducer = null
+
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  props1.put("auto.create.topics.enable", "false")
+  props2.put("auto.create.topics.enable", "false")
+  private val config1 = new KafkaConfig(props1)
+  private val config2 = new KafkaConfig(props2)
+  private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
+
+  private val bufferSize = 2 * config1.messageMaxBytes
+
+  private val topic1 = "topic-1"
+  private val topic2 = "topic-2"
+
+  // TODO: move this function to TestUtils after we have server dependant on clients
+  private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long,
+                           blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = {
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString)
+    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
+    producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
+    return new KafkaProducer(producerProps)
+  }
+
+  override def setUp() {
+    super.setUp()
+    server1 = TestUtils.createServer(config1)
+    server2 = TestUtils.createServer(config2)
+    servers = List(server1,server2)
+
+    // TODO: we need to migrate to new consumers when 0.9 is final
+    consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+
+    producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0
+    producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1
+    producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1
+    producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize); // produce with incorrect broker list
+  }
+
+  override def tearDown() {
+    server1.shutdown; Utils.rm(server1.config.logDirs)
+    server2.shutdown; Utils.rm(server2.config.logDirs)
+
+    consumer1.close
+    consumer2.close
+
+    if (producer1 != null) producer1.close
+    if (producer2 != null) producer2.close
+    if (producer3 != null) producer3.close
+    if (producer4 != null) producer4.close
+
+    super.tearDown()
+  }
+
+  /**
+   * With ack == 0 the future metadata will have no exceptions with offset -1
+   */
+  @Test
+  def testTooLargeRecordWithAckZero() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    // send a too-large record
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+    assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
+  }
+
+  /**
+   * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
+   */
+  @Test
+  def testTooLargeRecordWithAckOne() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    // send a too-large record
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+    intercept[ExecutionException] {
+      producer2.send(record).get
+    }
+  }
+
+  /**
+   * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
+   */
+  @Test
+  def testNonExistTopic() {
+    // send a record with non-exist topic
+    val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
+    intercept[ExecutionException] {
+      producer1.send(record).get
+    }
+  }
+
+  /**
+   * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
+   *
+   * TODO: other exceptions that can be thrown in ExecutionException:
+   *    UnknownTopicOrPartitionException
+   *    NotLeaderForPartitionException
+   *    LeaderNotAvailableException
+   *    CorruptRecordException
+   *    TimeoutException
+   */
+  @Test
+  def testWrongBrokerList() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    // send a record with incorrect broker list
+    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    intercept[ExecutionException] {
+      producer4.send(record).get
+    }
+  }
+
+  /**
+   * 1. With ack=0, the future metadata should not be blocked.
+   * 2. With ack=1, the future metadata should block,
+   *    and subsequent calls will eventually cause buffer full
+   */
+  @Test
+  def testNoResponse() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    // first send a message to make sure the metadata is refreshed
+    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    producer1.send(record).get
+    producer2.send(record).get
+
+    // stop IO threads and request handling, but leave networking operational
+    // any requests should be accepted and queue up, but not handled
+    server1.requestHandlerPool.shutdown()
+    server2.requestHandlerPool.shutdown()
+
+    producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
+
+    intercept[TimeoutException] {
+      producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
+    }
+
+    // TODO: expose producer configs after creating them
+    // send enough messages to get buffer full
+    val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
+
+    intercept[KafkaException] {
+      for (i <- 1 to tooManyRecords)
+        producer2.send(record)
+    }
+
+    // do not close produce2 since it will block
+    // TODO: can we do better?
+    producer2 = null
+  }
+
+  /**
+   *  The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException
+   */
+  @Test
+  def testInvalidPartition() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    // create a record with incorrect partition id, send should fail
+    val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
+    intercept[KafkaException] {
+      producer1.send(record)
+    }
+    intercept[KafkaException] {
+      producer2.send(record)
+    }
+    intercept[KafkaException] {
+      producer3.send(record)
+    }
+  }
+
+  /**
+   * The send call after producer closed should throw KafkaException cased by IllegalStateException
+   */
+  @Test
+  def testSendAfterClosed() {
+    // create topic
+    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+
+    // first send a message to make sure the metadata is refreshed
+    producer1.send(record).get
+    producer2.send(record).get
+    producer3.send(record).get
+
+    intercept[KafkaException] {
+      producer1.close
+      producer1.send(record)
+    }
+    intercept[KafkaException] {
+      producer2.close
+      producer2.send(record)
+    }
+    intercept[KafkaException] {
+      producer3.close
+      producer3.send(record)
+    }
+
+    // re-close producer is fine
+  }
+
+  /**
+   * With replication, producer should able able to find new leader after it detects broker failure
+   */
+  @Test
+  def testBrokerFailure() {
+    // create topic
+    val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    val leader = leaders(0)
+    assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
+
+    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L)
+
+    // shutdown broker
+    val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
+    serverToShutdown.shutdown()
+    serverToShutdown.awaitShutdown()
+
+    // send the message again, it should still succeed due-to retry
+    assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 34baa8c..66ea76b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package kafka.test
+package kafka.api.test
 
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZkUtils, Utils, TestUtils, Logging}
+import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
 import kafka.consumer.SimpleConsumer
 import kafka.api.FetchRequestBuilder
 import kafka.message.Message
@@ -33,7 +32,6 @@ import org.junit.Assert._
 
 import java.util.Properties
 import java.lang.{Integer, IllegalArgumentException}
-import org.apache.log4j.Logger
 
 
 class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
 
       // send a normal record
       val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
-      val response0 = producer.send(record0, callback)
-      assertEquals("Should have offset 0", 0L, response0.get.offset)
+      assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
 
       // send a record with null value should be ok
       val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
-      val response1 = producer.send(record1, callback)
-      assertEquals("Should have offset 1", 1L, response1.get.offset)
+      assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
 
       // send a record with null key should be ok
       val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
-      val response2 = producer.send(record2, callback)
-      assertEquals("Should have offset 2", 2L, response2.get.offset)
+      assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
 
       // send a record with null part id should be ok
       val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
-      val response3 = producer.send(record3, callback)
-      assertEquals("Should have offset 3", 3L, response3.get.offset)
+      assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
 
       // send a record with null topic should fail
       try {
         val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
-        val response4 = producer.send(record4, callback)
-        response4.wait
+        producer.send(record4, callback)
+        fail("Should not allow sending a record without topic")
       } catch {
         case iae: IllegalArgumentException => // this is ok
         case e: Throwable => fail("Only expecting IllegalArgumentException", e)
@@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
         producer.send(record0)
 
       // check that all messages have been acked via offset
-      val response5 = producer.send(record0, callback)
-      assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset)
+      assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
 
     } finally {
       if (producer != null) {
@@ -157,7 +150,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   /**
    * testClose checks the closing behavior
    *
-   * 1. After close() returns, all messages should be sent with correct returned offset metadata
+   * After close() returns, all messages should be sent with correct returned offset metadata
    */
   @Test
   def testClose() {
@@ -195,7 +188,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   /**
    * testSendToPartition checks the partitioning behavior
    *
-   * 1. The specified partition-id should be respected
+   * The specified partition-id should be respected
    */
   @Test
   def testSendToPartition() {
@@ -207,40 +200,40 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
     try {
       // create topic
       val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val partition = 1
 
       // make sure leaders exist
-      val leader1 = leaders.get(1)
+      val leader1 = leaders(partition)
       assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
 
-      val partition = 1
       val responses =
-        for (i <- 0 until numRecords)
+        for (i <- 1 to numRecords)
         yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
       val futures = responses.toList
-      futures.map(_.wait)
+      futures.map(_.get)
       for (future <- futures)
         assertTrue("Request should have completed", future.isDone)
 
       // make sure all of them end up in the same partition with increasing offset values
       for ((future, offset) <- futures zip (0 until numRecords)) {
-        assertEquals(offset, future.get.offset)
+        assertEquals(offset.toLong, future.get.offset)
         assertEquals(topic, future.get.topic)
-        assertEquals(1, future.get.partition)
+        assertEquals(partition, future.get.partition)
       }
 
       // make sure the fetched messages also respect the partitioning and ordering
       val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
-      }else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
       }
-      val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer
+      val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
       assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
 
       // TODO: also check topic and partition after they are added in the return messageSet
       for (i <- 0 to numRecords - 1) {
         assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
-        assertEquals(i, messageSet1(i).offset)
+        assertEquals(i.toLong, messageSet1(i).offset)
       }
     } finally {
       if (producer != null) {
@@ -250,6 +243,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
     }
   }
 
+  /**
+   * testAutoCreateTopic
+   *
+   * The topic should be created upon sending the first message
+   */
   @Test
   def testAutoCreateTopic() {
     val props = new Properties()
@@ -259,8 +257,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
     try {
       // Send a message to auto-create the topic
       val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
-      val response = producer.send(record)
-      assertEquals("Should have offset 0", 0L, response.get.offset)
+      assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected
       assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e2a9a56/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1c7a450..772d214 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,24 +23,27 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
-import junit.framework.AssertionFailedError
-import junit.framework.Assert._
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
+
+import collection.mutable.Map
+import collection.mutable.ListBuffer
+
+import org.I0Itec.zkclient.ZkClient
+
 import kafka.server._
 import kafka.producer._
 import kafka.message._
-import org.I0Itec.zkclient.ZkClient
+import kafka.api._
 import kafka.cluster.Broker
-import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.TimeUnit
-import kafka.api._
-import collection.mutable.Map
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
-import junit.framework.Assert
 import kafka.admin.AdminUtils
+import kafka.producer.ProducerConfig
 
+import junit.framework.AssertionFailedError
+import junit.framework.Assert._
 
 /**
  * Utility functions to help with testing
@@ -526,7 +529,7 @@ object TestUtils extends Logging {
   }
 
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = {
-    Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
+    assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
       TestUtils.waitUntilTrue(() =>
         servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout))
   }