You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/10 03:26:50 UTC

git commit: kafka-1555; provide strong consistency with reasonable availability; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 58e58529b -> 043190c60


kafka-1555; provide strong consistency with reasonable availability; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/043190c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/043190c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/043190c6

Branch: refs/heads/trunk
Commit: 043190c601f37e42c32189f3df5ddd986e57da95
Parents: 58e5852
Author: Gwen Shapira <cs...@gmail.com>
Authored: Thu Oct 9 18:24:53 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 9 18:26:03 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerConfig.java  | 17 ++++--
 .../apache/kafka/common/config/ConfigDef.java   | 42 +++++++++++++
 .../NotEnoughReplicasAfterAppendException.java  | 43 ++++++++++++++
 .../errors/NotEnoughReplicasException.java      | 40 +++++++++++++
 .../apache/kafka/common/protocol/Errors.java    |  7 ++-
 .../main/scala/kafka/cluster/Partition.scala    | 27 ++++++++-
 .../main/scala/kafka/common/ErrorMapping.scala  | 20 ++++---
 .../NotEnoughReplicasAfterAppendException.scala | 27 +++++++++
 .../common/NotEnoughReplicasException.scala     | 25 ++++++++
 core/src/main/scala/kafka/log/LogConfig.scala   | 62 +++++++++++++-------
 .../kafka/producer/SyncProducerConfig.scala     | 14 +++--
 .../src/main/scala/kafka/server/KafkaApis.scala |  6 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 ++
 .../kafka/api/ProducerFailureHandlingTest.scala | 57 +++++++++++++++++-
 .../unit/kafka/producer/ProducerTest.scala      | 27 ++++-----
 .../unit/kafka/producer/SyncProducerTest.scala  | 22 +++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++-
 17 files changed, 384 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..79d57f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -14,7 +14,10 @@ package org.apache.kafka.clients.producer;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.AbstractConfig;
@@ -77,7 +80,8 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>acks</code> */
     public static final String ACKS_CONFIG = "acks";
-    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: "
+    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
+                                           + " durability of records that are sent. The following settings are common: "
                                            + " <ul>"
                                            + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                            + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
@@ -89,9 +93,7 @@ public class ProducerConfig extends AbstractConfig {
                                            + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                            + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                            + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
-                                           + " remains alive. This is the strongest available guarantee."
-                                           + " <li>Other settings such as <code>acks=2</code> are also possible, and will require the given number of"
-                                           + " acknowledgements but this is generally less useful.";
+                                           + " remains alive. This is the strongest available guarantee.";
 
     /** <code>timeout.ms</code> */
     public static final String TIMEOUT_CONFIG = "timeout.ms";
@@ -175,7 +177,12 @@ public class ProducerConfig extends AbstractConfig {
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
-                                .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC)
+                                .define(ACKS_CONFIG,
+                                        Type.STRING,
+                                        "1",
+                                        in(Arrays.asList("all","-1", "0", "1")),
+                                        Importance.HIGH,
+                                        ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                                 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index addc906..227309e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -268,6 +268,48 @@ public class ConfigDef {
         }
     }
 
+  public static class ValidString implements Validator {
+    List<String> validStrings;
+
+    private ValidString(List<String> validStrings) {
+      this.validStrings = validStrings;
+    }
+
+    public static ValidString in(List<String> validStrings) {
+      return new ValidString(validStrings);
+    }
+
+    @Override
+    public void ensureValid(String name, Object o) {
+
+      String s = (String) o;
+
+      if (!validStrings.contains(s)) {
+        throw new ConfigException(name,o,"String must be one of:" +join(validStrings));
+      }
+
+    }
+
+    public String toString() {
+      return "[" + join(validStrings) + "]";
+    }
+
+    private String join(List<String> list)
+    {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (String item : list)
+      {
+        if (first)
+          first = false;
+        else
+          sb.append(",");
+        sb.append(item);
+      }
+      return sb.toString();
+    }
+  }
+
     private static class ConfigKey {
         public final String name;
         public final Type type;

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
new file mode 100644
index 0000000..75c80a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+public class NotEnoughReplicasAfterAppendException extends RetriableException {
+  private static final long serialVersionUID = 1L;
+
+  public NotEnoughReplicasAfterAppendException() {
+    super();
+  }
+
+  public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+    super(message,cause);
+  }
+
+  public NotEnoughReplicasAfterAppendException(String message) {
+    super(message);
+  }
+
+  public NotEnoughReplicasAfterAppendException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
new file mode 100644
index 0000000..486d515
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ */
+public class NotEnoughReplicasException extends RetriableException {
+  private static final long serialVersionUID = 1L;
+
+  public NotEnoughReplicasException() {
+    super();
+  }
+
+  public NotEnoughReplicasException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public NotEnoughReplicasException(String message) {
+    super(message);
+  }
+
+  public NotEnoughReplicasException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d5f5de3..3316b6a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.*;
 /**
  * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code cannot.
- * 
+ *
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
@@ -44,8 +44,9 @@ public enum Errors {
     NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
     // TODO: errorCode 14, 15, 16
     INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server."));
-
+    RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
+    NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
+    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required."));
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
     static {

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ff106b4..e88ecf2 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -269,14 +269,26 @@ class Partition(val topic: String,
           else
             true /* also count the local (leader) replica */
         })
+        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
+
         trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
-        if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
-          (requiredAcks > 0 && numAcks >= requiredAcks)) {
+        if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
           /*
           * requiredAcks < 0 means acknowledge after all replicas in ISR
           * are fully caught up to the (local) leader's offset
           * corresponding to this produce request.
+          *
+          * minIsr means that the topic is configured not to accept messages
+          * if there are not enough replicas in ISR
+          * in this scenario the request was already appended locally and
+          * then added to the purgatory before the ISR was shrunk
           */
+          if (minIsr <= curInSyncReplicas.size) {
+            (true, ErrorMapping.NoError)
+          } else {
+            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
+          }
+        } else if (requiredAcks > 0 && numAcks >= requiredAcks) {
           (true, ErrorMapping.NoError)
         } else
           (false, ErrorMapping.NoError)
@@ -350,12 +362,21 @@ class Partition(val topic: String,
     stuckReplicas ++ slowReplicas
   }
 
-  def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
+  def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
     inReadLock(leaderIsrUpdateLock) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
         case Some(leaderReplica) =>
           val log = leaderReplica.log.get
+          val minIsr = log.config.minInSyncReplicas
+          val inSyncSize = inSyncReplicas.size
+
+          // Avoid writing to leader if there are not enough insync replicas to make it safe
+          if (inSyncSize < minIsr && requiredAcks == -1) {
+            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
+              .format(topic,partitionId,minIsr,inSyncSize))
+          }
+
           val info = log.append(messages, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been updated
           replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index a190607..880ab4a 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import scala.Predef._
 
 /**
- * A bi-directional mapping between error codes and exceptions x  
+ * A bi-directional mapping between error codes and exceptions
  */
 object ErrorMapping {
   val EmptyByteBuffer = ByteBuffer.allocate(0)
@@ -47,8 +47,10 @@ object ErrorMapping {
   val NotCoordinatorForConsumerCode: Short = 16
   val InvalidTopicCode : Short = 17
   val MessageSetSizeTooLargeCode: Short = 18
+  val NotEnoughReplicasCode : Short = 19
+  val NotEnoughReplicasAfterAppendCode: Short = 20
 
-  private val exceptionToCode = 
+  private val exceptionToCode =
     Map[Class[Throwable], Short](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
@@ -66,15 +68,17 @@ object ErrorMapping {
       classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
       classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
-      classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode
+      classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
+      classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
     ).withDefaultValue(UnknownCode)
-  
+
   /* invert the mapping */
-  private val codeToException = 
+  private val codeToException =
     (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
-  
+
   def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
-  
+
   def maybeThrowException(code: Short) =
     if(code != 0)
       throw codeToException(code).newInstance()

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
new file mode 100644
index 0000000..c4f9def
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+class NotEnoughReplicasAfterAppendException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
new file mode 100644
index 0000000..bfbe0ee
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Message was rejected because number of insync replicas for the partition is lower than min.insync.replicas
+ */
+class NotEnoughReplicasException(message: String) extends RuntimeException(message) {
+    def this() = this(null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 5746ad4..d2cc9e3 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -36,6 +36,7 @@ object Defaults {
   val MinCleanableDirtyRatio = 0.5
   val Compact = false
   val UncleanLeaderElectionEnable = true
+  val MinInSyncReplicas = 1
 }
 
 /**
@@ -53,7 +54,9 @@ object Defaults {
  * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
  * @param compact Should old segments in this log be deleted or deduplicated?
  * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property
- *   but included here for topic-specific configuration validation purposes
+ *                                    but included here for topic-specific configuration validation purposes
+ * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
+ *
  */
 case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
                      val segmentMs: Long = Defaults.SegmentMs,
@@ -68,8 +71,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
                      val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
                      val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
                      val compact: Boolean = Defaults.Compact,
-                     val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) {
-  
+                     val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
+                     val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) {
+
   def toProps: Properties = {
     val props = new Properties()
     import LogConfig._
@@ -87,9 +91,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
     props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
     props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
     props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
+    props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
     props
   }
-  
 }
 
 object LogConfig {
@@ -107,13 +111,14 @@ object LogConfig {
   val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
   val CleanupPolicyProp = "cleanup.policy"
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
-  
-  val ConfigNames = Set(SegmentBytesProp, 
-                        SegmentMsProp, 
-                        SegmentIndexBytesProp, 
-                        FlushMessagesProp, 
-                        FlushMsProp, 
-                        RetentionBytesProp, 
+  val MinInSyncReplicasProp = "min.insync.replicas"
+
+  val ConfigNames = Set(SegmentBytesProp,
+                        SegmentMsProp,
+                        SegmentIndexBytesProp,
+                        FlushMessagesProp,
+                        FlushMsProp,
+                        RetentionBytesProp,
                         RententionMsProp,
                         MaxMessageBytesProp,
                         IndexIntervalBytesProp,
@@ -121,9 +126,9 @@ object LogConfig {
                         DeleteRetentionMsProp,
                         MinCleanableDirtyRatioProp,
                         CleanupPolicyProp,
-                        UncleanLeaderElectionEnableProp)
-    
-  
+                        UncleanLeaderElectionEnableProp,
+                        MinInSyncReplicasProp)
+
   /**
    * Parse the given properties instance into a LogConfig object
    */
@@ -144,9 +149,10 @@ object LogConfig {
                   compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete")
                     .trim.toLowerCase != "delete",
                   uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
-                    Defaults.UncleanLeaderElectionEnable.toString).toBoolean)
+                    Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
+                  minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
   }
-  
+
   /**
    * Create a log config instance using the given properties and defaults
    */
@@ -155,7 +161,7 @@ object LogConfig {
     props.putAll(overrides)
     fromProps(props)
   }
-  
+
   /**
    * Check that property names are valid
    */
@@ -164,15 +170,27 @@ object LogConfig {
     for(name <- props.keys)
       require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
   }
-  
+
   /**
    * Check that the given properties contain only valid log config names, and that all values can be parsed.
    */
   def validate(props: Properties) {
     validateNames(props)
+    validateMinInSyncReplicas(props)
     LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
   }
-  
-}
-                      
-                     
\ No newline at end of file
+
+  /**
+   * Check that MinInSyncReplicas is reasonable
+   * Unfortunately, we can't validate its smaller than number of replicas
+   * since we don't have this information here
+   */
+  private def validateMinInSyncReplicas(props: Properties) {
+    val minIsr = props.getProperty(MinInSyncReplicasProp)
+    if (minIsr != null && minIsr.toInt < 1) {
+      throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " +
+        " Valid values are at least 1")
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 69b2d0c..a08ce00 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -42,11 +42,15 @@ trait SyncProducerConfigShared {
   val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
 
   /*
-   * The required acks of the producer requests - negative value means ack
-   * after the replicas in ISR have caught up to the leader's offset
-   * corresponding to this produce request.
+   * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+   * This controls the durability of the messages sent by the producer.
+   *
+   * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.
+   * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge
+   * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write
    */
-  val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+
+  val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1))
 
   /*
    * The ack timeout of the producer requests. Value must be non-negative and non-zero
@@ -59,4 +63,4 @@ object SyncProducerConfig {
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
   val DefaultAckTimeoutMs = 10000
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c584b55..67f2833 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -248,7 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val info = partitionOpt match {
           case Some(partition) =>
-            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)
           case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
             .format(topicAndPartition, brokerId))
         }
@@ -284,6 +284,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
                producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
           new ProduceResult(topicAndPartition, nle)
+        case nere: NotEnoughReplicasException =>
+          warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+            producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage))
+          new ProduceResult(topicAndPartition, nere)
         case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 165c816..90af698 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -199,6 +199,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* enable auto creation of topic on the server */
   val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
 
+  /* define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all) */
+  val minInSyncReplicas = props.getIntInRange("min.insync.replicas",1,(1,Int.MaxValue))
+
+
+
   /*********** Replication configuration ***********/
 
   /* the socket timeout for controller-to-broker channels */

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 39f777b..209a409 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -18,12 +18,12 @@
 package kafka.api
 
 import kafka.common.Topic
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException}
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import java.util.Random
+import java.util.{Properties, Random}
 import java.lang.Integer
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
@@ -302,6 +302,59 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
   }
 
+  @Test
+  def testNotEnoughReplicas() {
+    val topicName = "minisrtest"
+    val topicProps = new Properties();
+    topicProps.put("min.insync.replicas","3");
+
+
+    TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    try {
+      producer3.send(record).get
+      fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+    } catch {
+      case e: ExecutionException =>
+        if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+          fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+        }
+    }
+  }
+
+  @Test
+  def testNotEnoughReplicasAfterBrokerShutdown() {
+    val topicName = "minisrtest2"
+    val topicProps = new Properties();
+    topicProps.put("min.insync.replicas","2");
+
+
+    TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    // This should work
+    producer3.send(record).get
+
+    //shut down one broker
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
+    try {
+      producer3.send(record).get
+      fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+    } catch {
+      case e: ExecutionException =>
+        if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+          fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+        }
+    }
+
+    servers.head.startup()
+
+  }
+
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
   {
     val numRecords = 1000

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dd71d81..ce65dab 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.producer
 
+import org.apache.kafka.common.config.ConfigException
 import org.scalatest.TestFailedException
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer.SimpleConsumer
@@ -143,7 +144,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   @Test
   def testSendToNewTopic() {
     val props1 = new util.Properties()
-    props1.put("request.required.acks", "2")
+    props1.put("request.required.acks", "-1")
 
     val topic = "new-topic"
     // create topic with 1 partition and await leadership
@@ -181,24 +182,20 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // no need to retry since the send will always fail
     props2.put("message.send.max.retries", "0")
 
-    val producer2 = TestUtils.createProducer[String, String](
-      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
-      encoder = classOf[StringEncoder].getName,
-      keyEncoder = classOf[StringEncoder].getName,
-      partitioner = classOf[StaticPartitioner].getName,
-      producerProps = props2)
-
     try {
-      producer2.send(new KeyedMessage[String, String](topic, "test", "test2"))
-      fail("Should have timed out for 3 acks.")
+      val producer2 = TestUtils.createProducer[String, String](
+        brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[StringEncoder].getName,
+        partitioner = classOf[StaticPartitioner].getName,
+        producerProps = props2)
+        producer2.close
+        fail("we don't support request.required.acks greater than 1")
     }
     catch {
-      case se: FailedToSendMessageException =>
-        // this is expected
+      case iae: IllegalArgumentException =>  // this is expected
       case e: Throwable => fail("Not expected", e)
-    }
-    finally {
-      producer2.close()
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 24deea0..fb61d55 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -18,6 +18,7 @@
 package kafka.producer
 
 import java.net.SocketTimeoutException
+import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
@@ -113,6 +114,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
+
   @Test
   def testMessageSizeTooLargeWithAckZero() {
     val server = servers.head
@@ -225,4 +227,24 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val response = producer.send(emptyRequest)
     Assert.assertTrue(response == null)
   }
+
+  @Test
+  def testNotEnoughReplicas()  {
+    val topicName = "minisrtest"
+    val server = servers.head
+
+    val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+    props.put("request.required.acks", "-1")
+
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val topicProps = new Properties();
+    topicProps.put("min.insync.replicas","2");
+    AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
+
+    val response = producer.send(TestUtils.produceRequest(topicName, 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
+
+    Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName, 0)).error)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2dbdd3c..dd3640f 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -168,10 +168,14 @@ object TestUtils extends Logging {
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
-                  servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
+  def createTopic(zkClient: ZkClient,
+                  topic: String,
+                  numPartitions: Int = 1,
+                  replicationFactor: Int = 1,
+                  servers: Seq[KafkaServer],
+                  topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = {
     // create topic
-    AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
+    AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, topicConfig)
     // wait until the update metadata request for new topic reaches all servers
     (0 until numPartitions).map { case i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)