You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/10 03:26:50 UTC
git commit: kafka-1555;
provide strong consistency with reasonable availability;
patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 58e58529b -> 043190c60
kafka-1555; provide strong consistency with reasonable availability; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/043190c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/043190c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/043190c6
Branch: refs/heads/trunk
Commit: 043190c601f37e42c32189f3df5ddd986e57da95
Parents: 58e5852
Author: Gwen Shapira <cs...@gmail.com>
Authored: Thu Oct 9 18:24:53 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 9 18:26:03 2014 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/ProducerConfig.java | 17 ++++--
.../apache/kafka/common/config/ConfigDef.java | 42 +++++++++++++
.../NotEnoughReplicasAfterAppendException.java | 43 ++++++++++++++
.../errors/NotEnoughReplicasException.java | 40 +++++++++++++
.../apache/kafka/common/protocol/Errors.java | 7 ++-
.../main/scala/kafka/cluster/Partition.scala | 27 ++++++++-
.../main/scala/kafka/common/ErrorMapping.scala | 20 ++++---
.../NotEnoughReplicasAfterAppendException.scala | 27 +++++++++
.../common/NotEnoughReplicasException.scala | 25 ++++++++
core/src/main/scala/kafka/log/LogConfig.scala | 62 +++++++++++++-------
.../kafka/producer/SyncProducerConfig.scala | 14 +++--
.../src/main/scala/kafka/server/KafkaApis.scala | 6 +-
.../main/scala/kafka/server/KafkaConfig.scala | 5 ++
.../kafka/api/ProducerFailureHandlingTest.scala | 57 +++++++++++++++++-
.../unit/kafka/producer/ProducerTest.scala | 27 ++++-----
.../unit/kafka/producer/SyncProducerTest.scala | 22 +++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++-
17 files changed, 384 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..79d57f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -14,7 +14,10 @@ package org.apache.kafka.clients.producer;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
@@ -77,7 +80,8 @@ public class ProducerConfig extends AbstractConfig {
/** <code>acks</code> */
public static final String ACKS_CONFIG = "acks";
- private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: "
+ private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
+ + " durability of records that are sent. The following settings are common: "
+ " <ul>"
+ " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
@@ -89,9 +93,7 @@ public class ProducerConfig extends AbstractConfig {
+ " acknowledging the record but before the followers have replicated it then the record will be lost."
+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
- + " remains alive. This is the strongest available guarantee."
- + " <li>Other settings such as <code>acks=2</code> are also possible, and will require the given number of"
- + " acknowledgements but this is generally less useful.";
+ + " remains alive. This is the strongest available guarantee.";
/** <code>timeout.ms</code> */
public static final String TIMEOUT_CONFIG = "timeout.ms";
@@ -175,7 +177,12 @@ public class ProducerConfig extends AbstractConfig {
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
- .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC)
+ .define(ACKS_CONFIG,
+ Type.STRING,
+ "1",
+ in(Arrays.asList("all","-1", "0", "1")),
+ Importance.HIGH,
+ ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index addc906..227309e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -268,6 +268,48 @@ public class ConfigDef {
}
}
+ public static class ValidString implements Validator {
+ List<String> validStrings;
+
+ private ValidString(List<String> validStrings) {
+ this.validStrings = validStrings;
+ }
+
+ public static ValidString in(List<String> validStrings) {
+ return new ValidString(validStrings);
+ }
+
+ @Override
+ public void ensureValid(String name, Object o) {
+
+ String s = (String) o;
+
+ if (!validStrings.contains(s)) {
+ throw new ConfigException(name,o,"String must be one of:" +join(validStrings));
+ }
+
+ }
+
+ public String toString() {
+ return "[" + join(validStrings) + "]";
+ }
+
+ private String join(List<String> list)
+ {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String item : list)
+ {
+ if (first)
+ first = false;
+ else
+ sb.append(",");
+ sb.append(item);
+ }
+ return sb.toString();
+ }
+ }
+
private static class ConfigKey {
public final String name;
public final Type type;
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
new file mode 100644
index 0000000..75c80a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+public class NotEnoughReplicasAfterAppendException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasAfterAppendException() {
+ super();
+ }
+
+ public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+ super(message,cause);
+ }
+
+ public NotEnoughReplicasAfterAppendException(String message) {
+ super(message);
+ }
+
+ public NotEnoughReplicasAfterAppendException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
new file mode 100644
index 0000000..486d515
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ */
+public class NotEnoughReplicasException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasException() {
+ super();
+ }
+
+ public NotEnoughReplicasException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotEnoughReplicasException(String message) {
+ super(message);
+ }
+
+ public NotEnoughReplicasException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d5f5de3..3316b6a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.*;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
- *
+ *
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
@@ -44,8 +44,9 @@ public enum Errors {
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
// TODO: errorCode 14, 15, 16
INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
- RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server."));
-
+ RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
+ NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
+ NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
static {
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ff106b4..e88ecf2 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -269,14 +269,26 @@ class Partition(val topic: String,
else
true /* also count the local (leader) replica */
})
+ val minIsr = leaderReplica.log.get.config.minInSyncReplicas
+
trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
- if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
- (requiredAcks > 0 && numAcks >= requiredAcks)) {
+ if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
/*
* requiredAcks < 0 means acknowledge after all replicas in ISR
* are fully caught up to the (local) leader's offset
* corresponding to this produce request.
+ *
+ * minIsr means that the topic is configured not to accept messages
+ * if there are not enough replicas in ISR
+ * in this scenario the request was already appended locally and
+ * then added to the purgatory before the ISR was shrunk
*/
+ if (minIsr <= curInSyncReplicas.size) {
+ (true, ErrorMapping.NoError)
+ } else {
+ (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
+ }
+ } else if (requiredAcks > 0 && numAcks >= requiredAcks) {
(true, ErrorMapping.NoError)
} else
(false, ErrorMapping.NoError)
@@ -350,12 +362,21 @@ class Partition(val topic: String,
stuckReplicas ++ slowReplicas
}
- def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
+ def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
+ val minIsr = log.config.minInSyncReplicas
+ val inSyncSize = inSyncReplicas.size
+
+ // Avoid writing to leader if there are not enough insync replicas to make it safe
+ if (inSyncSize < minIsr && requiredAcks == -1) {
+ throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
+ .format(topic,partitionId,minIsr,inSyncSize))
+ }
+
val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index a190607..880ab4a 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.Predef._
/**
- * A bi-directional mapping between error codes and exceptions x
+ * A bi-directional mapping between error codes and exceptions
*/
object ErrorMapping {
val EmptyByteBuffer = ByteBuffer.allocate(0)
@@ -47,8 +47,10 @@ object ErrorMapping {
val NotCoordinatorForConsumerCode: Short = 16
val InvalidTopicCode : Short = 17
val MessageSetSizeTooLargeCode: Short = 18
+ val NotEnoughReplicasCode : Short = 19
+ val NotEnoughReplicasAfterAppendCode: Short = 20
- private val exceptionToCode =
+ private val exceptionToCode =
Map[Class[Throwable], Short](
classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
@@ -66,15 +68,17 @@ object ErrorMapping {
classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
- classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode
+ classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
+ classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
+ classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
).withDefaultValue(UnknownCode)
-
+
/* invert the mapping */
- private val codeToException =
+ private val codeToException =
(Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
-
+
def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
-
+
def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
new file mode 100644
index 0000000..c4f9def
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+class NotEnoughReplicasAfterAppendException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
new file mode 100644
index 0000000..bfbe0ee
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Message was rejected because number of insync replicas for the partition is lower than min.insync.replicas
+ */
+class NotEnoughReplicasException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 5746ad4..d2cc9e3 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -36,6 +36,7 @@ object Defaults {
val MinCleanableDirtyRatio = 0.5
val Compact = false
val UncleanLeaderElectionEnable = true
+ val MinInSyncReplicas = 1
}
/**
@@ -53,7 +54,9 @@ object Defaults {
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param compact Should old segments in this log be deleted or deduplicated?
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property
- * but included here for topic-specific configuration validation purposes
+ * but included here for topic-specific configuration validation purposes
+ * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
+ *
*/
case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val segmentMs: Long = Defaults.SegmentMs,
@@ -68,8 +71,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
val compact: Boolean = Defaults.Compact,
- val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) {
-
+ val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
+ val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) {
+
def toProps: Properties = {
val props = new Properties()
import LogConfig._
@@ -87,9 +91,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
+ props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
props
}
-
}
object LogConfig {
@@ -107,13 +111,14 @@ object LogConfig {
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
-
- val ConfigNames = Set(SegmentBytesProp,
- SegmentMsProp,
- SegmentIndexBytesProp,
- FlushMessagesProp,
- FlushMsProp,
- RetentionBytesProp,
+ val MinInSyncReplicasProp = "min.insync.replicas"
+
+ val ConfigNames = Set(SegmentBytesProp,
+ SegmentMsProp,
+ SegmentIndexBytesProp,
+ FlushMessagesProp,
+ FlushMsProp,
+ RetentionBytesProp,
RententionMsProp,
MaxMessageBytesProp,
IndexIntervalBytesProp,
@@ -121,9 +126,9 @@ object LogConfig {
DeleteRetentionMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp,
- UncleanLeaderElectionEnableProp)
-
-
+ UncleanLeaderElectionEnableProp,
+ MinInSyncReplicasProp)
+
/**
* Parse the given properties instance into a LogConfig object
*/
@@ -144,9 +149,10 @@ object LogConfig {
compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete")
.trim.toLowerCase != "delete",
uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
- Defaults.UncleanLeaderElectionEnable.toString).toBoolean)
+ Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
+ minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
}
-
+
/**
* Create a log config instance using the given properties and defaults
*/
@@ -155,7 +161,7 @@ object LogConfig {
props.putAll(overrides)
fromProps(props)
}
-
+
/**
* Check that property names are valid
*/
@@ -164,15 +170,27 @@ object LogConfig {
for(name <- props.keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
}
-
+
/**
* Check that the given properties contain only valid log config names, and that all values can be parsed.
*/
def validate(props: Properties) {
validateNames(props)
+ validateMinInSyncReplicas(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
-
-}
-
-
\ No newline at end of file
+
+ /**
+ * Check that MinInSyncReplicas is reasonable
+ * Unfortunately, we can't validate its smaller than number of replicas
+ * since we don't have this information here
+ */
+ private def validateMinInSyncReplicas(props: Properties) {
+ val minIsr = props.getProperty(MinInSyncReplicasProp)
+ if (minIsr != null && minIsr.toInt < 1) {
+ throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " +
+ " Valid values are at least 1")
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 69b2d0c..a08ce00 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -42,11 +42,15 @@ trait SyncProducerConfigShared {
val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
/*
- * The required acks of the producer requests - negative value means ack
- * after the replicas in ISR have caught up to the leader's offset
- * corresponding to this produce request.
+ * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+ * This controls the durability of the messages sent by the producer.
+ *
+ * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.
+ * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge
+ * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write
*/
- val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+
+ val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1))
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
@@ -59,4 +63,4 @@ object SyncProducerConfig {
val DefaultClientId = ""
val DefaultRequiredAcks : Short = 0
val DefaultAckTimeoutMs = 10000
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c584b55..67f2833 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -248,7 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info = partitionOpt match {
case Some(partition) =>
- partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicAndPartition, brokerId))
}
@@ -284,6 +284,10 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
new ProduceResult(topicAndPartition, nle)
+ case nere: NotEnoughReplicasException =>
+ warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+ producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage))
+ new ProduceResult(topicAndPartition, nere)
case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 165c816..90af698 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -199,6 +199,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* enable auto creation of topic on the server */
val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
+ /* define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all) */
+ val minInSyncReplicas = props.getIntInRange("min.insync.replicas",1,(1,Int.MaxValue))
+
+
+
/*********** Replication configuration ***********/
/* the socket timeout for controller-to-broker channels */
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 39f777b..209a409 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -18,12 +18,12 @@
package kafka.api
import kafka.common.Topic
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException}
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import java.util.Random
+import java.util.{Properties, Random}
import java.lang.Integer
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
@@ -302,6 +302,59 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}
+ @Test
+ def testNotEnoughReplicas() {
+ val topicName = "minisrtest"
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","3");
+
+
+ TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+ val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+ try {
+ producer3.send(record).get
+ fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+ } catch {
+ case e: ExecutionException =>
+ if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+ fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+ }
+ }
+ }
+
+ @Test
+ def testNotEnoughReplicasAfterBrokerShutdown() {
+ val topicName = "minisrtest2"
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","2");
+
+
+ TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+ val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+ // This should work
+ producer3.send(record).get
+
+ //shut down one broker
+ servers.head.shutdown()
+ servers.head.awaitShutdown()
+ try {
+ producer3.send(record).get
+ fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+ } catch {
+ case e: ExecutionException =>
+ if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+ fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+ }
+ }
+
+ servers.head.startup()
+
+ }
+
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
{
val numRecords = 1000
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dd71d81..ce65dab 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,6 +17,7 @@
package kafka.producer
+import org.apache.kafka.common.config.ConfigException
import org.scalatest.TestFailedException
import org.scalatest.junit.JUnit3Suite
import kafka.consumer.SimpleConsumer
@@ -143,7 +144,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@Test
def testSendToNewTopic() {
val props1 = new util.Properties()
- props1.put("request.required.acks", "2")
+ props1.put("request.required.acks", "-1")
val topic = "new-topic"
// create topic with 1 partition and await leadership
@@ -181,24 +182,20 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// no need to retry since the send will always fail
props2.put("message.send.max.retries", "0")
- val producer2 = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props2)
-
try {
- producer2.send(new KeyedMessage[String, String](topic, "test", "test2"))
- fail("Should have timed out for 3 acks.")
+ val producer2 = TestUtils.createProducer[String, String](
+ brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName,
+ partitioner = classOf[StaticPartitioner].getName,
+ producerProps = props2)
+ producer2.close
+ fail("we don't support request.required.acks greater than 1")
}
catch {
- case se: FailedToSendMessageException =>
- // this is expected
+ case iae: IllegalArgumentException => // this is expected
case e: Throwable => fail("Not expected", e)
- }
- finally {
- producer2.close()
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 24deea0..fb61d55 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -18,6 +18,7 @@
package kafka.producer
import java.net.SocketTimeoutException
+import java.util.Properties
import junit.framework.Assert
import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
@@ -113,6 +114,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
}
+
@Test
def testMessageSizeTooLargeWithAckZero() {
val server = servers.head
@@ -225,4 +227,24 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val response = producer.send(emptyRequest)
Assert.assertTrue(response == null)
}
+
+ @Test
+ def testNotEnoughReplicas() {
+ val topicName = "minisrtest"
+ val server = servers.head
+
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ props.put("request.required.acks", "-1")
+
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","2");
+ AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
+
+ val response = producer.send(TestUtils.produceRequest(topicName, 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
+
+ Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName, 0)).error)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/043190c6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2dbdd3c..dd3640f 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -168,10 +168,14 @@ object TestUtils extends Logging {
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
- def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
- servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
+ def createTopic(zkClient: ZkClient,
+ topic: String,
+ numPartitions: Int = 1,
+ replicationFactor: Int = 1,
+ servers: Seq[KafkaServer],
+ topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = {
// create topic
- AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
+ AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, topicConfig)
// wait until the update metadata request for new topic reaches all servers
(0 until numPartitions).map { case i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)