You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 02:41:49 UTC
[3/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index ab81bfe..d238093 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -56,14 +56,14 @@ public class MemoryRecordsBuilder {
private final int initPos;
private final long baseOffset;
private final long logAppendTime;
- private final long producerId;
- private final short producerEpoch;
- private final int baseSequence;
private final boolean isTransactional;
private final int partitionLeaderEpoch;
private final int writeLimit;
private final int initialCapacity;
+ private long producerId;
+ private short producerEpoch;
+ private int baseSequence;
private long writtenUncompressed = 0;
private int numRecords = 0;
private float compressionRate = 1;
@@ -193,6 +193,19 @@ public class MemoryRecordsBuilder {
return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
}
+ public void setProducerState(long pid, short epoch, int baseSequence) {
+ if (isClosed()) {
+ // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
+ // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
+ // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence
+ // once a batch has been sent to the broker risks introducing duplicates.
+ throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
+ }
+ this.producerId = pid;
+ this.producerEpoch = epoch;
+ this.baseSequence = baseSequence;
+ }
+
public void close() {
if (builtRecords != null)
return;
@@ -577,4 +590,11 @@ public class MemoryRecordsBuilder {
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
}
+
+ /**
+ * Return the ProducerId (PID) of the RecordBatches created by this builder.
+ */
+ public long producerId() {
+ return this.producerId;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 90f1486..ae4a225 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -146,6 +146,11 @@ public interface RecordBatch extends Iterable<Record> {
short producerEpoch();
/**
+ * Does the batch have a valid producer id set.
+ */
+ boolean hasProducerId();
+
+ /**
* Get the first sequence number of this record batch.
* @return The first sequence number or -1 if there is none
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3a99a8a..1638556 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -171,6 +171,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case DELETE_RECORDS:
request = new DeleteRecordsRequest(struct, version);
break;
+ case INIT_PRODUCER_ID:
+ request = new InitPidRequest(struct, version);
+ break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index a5d0dc4..314aa42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -93,6 +93,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new DeleteTopicsResponse(struct);
case DELETE_RECORDS:
return new DeleteRecordsResponse(struct);
+ case INIT_PRODUCER_ID:
+ return new InitPidResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
new file mode 100644
index 0000000..284107f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class InitPidRequest extends AbstractRequest {
+ private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+
+ private final String transactionalId;
+
+ public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
+ private final String transactionalId;
+ public Builder(String transactionalId) {
+ super(ApiKeys.INIT_PRODUCER_ID);
+ if (transactionalId != null && transactionalId.isEmpty())
+ throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+ this.transactionalId = transactionalId;
+ }
+
+ @Override
+ public InitPidRequest build(short version) {
+ return new InitPidRequest(this.transactionalId, version);
+ }
+
+ @Override
+ public String toString() {
+ return "(type=InitPidRequest)";
+ }
+
+ }
+
+ public InitPidRequest(Struct struct, short version) {
+ super(version);
+ this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+ }
+
+ private InitPidRequest(String transactionalId, short version) {
+ super(version);
+ this.transactionalId = transactionalId;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(Throwable e) {
+ return new InitPidResponse(Errors.forException(e));
+ }
+
+ public static InitPidRequest parse(ByteBuffer buffer, short version) {
+ return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
+ }
+
+ public String transactionalId() {
+ return transactionalId;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
+ struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ return struct;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
new file mode 100644
index 0000000..ee92375
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.nio.ByteBuffer;
+
+public class InitPidResponse extends AbstractResponse {
+ /**
+ * Possible Error codes:
+ * OK
+ *
+ */
+ private static final String PRODUCER_ID_KEY_NAME = "pid";
+ private static final String EPOCH_KEY_NAME = "epoch";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private final Errors error;
+ private final long producerId;
+ private final short epoch;
+
+ public InitPidResponse(Errors error, long producerId, short epoch) {
+ this.error = error;
+ this.producerId = producerId;
+ this.epoch = epoch;
+ }
+
+ public InitPidResponse(Struct struct) {
+ this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+ this.epoch = struct.getShort(EPOCH_KEY_NAME);
+ }
+
+ public InitPidResponse(Errors errors) {
+ this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public short epoch() {
+ return epoch;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+ struct.set(PRODUCER_ID_KEY_NAME, producerId);
+ struct.set(EPOCH_KEY_NAME, epoch);
+ struct.set(ERROR_CODE_KEY_NAME, error.code());
+ return struct;
+ }
+
+ public static InitPidResponse parse(ByteBuffer buffer, short version) {
+ return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
index 50c90a8..6efe311 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -31,6 +31,16 @@ public final class ByteUtils {
private ByteUtils() {}
/**
+ * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+ *
+ * @param buffer The buffer to read from
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer) {
+ return buffer.getInt() & 0xffffffffL;
+ }
+
+ /**
* Read an unsigned integer from the given position without modifying the buffers position
*
* @param buffer the buffer to read from
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9cc863b..9117e16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -17,18 +17,23 @@
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
@@ -89,7 +94,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize,
- CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
// append to the first batch
@@ -108,7 +113,6 @@ public class RecordAccumulatorTest {
Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
- assertFalse(partitionBatchesIterator.next().isWritable());
assertTrue(partitionBatchesIterator.next().isWritable());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -129,7 +133,7 @@ public class RecordAccumulatorTest {
public void testAppendLarge() throws Exception {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
}
@@ -138,7 +142,7 @@ public class RecordAccumulatorTest {
public void testLinger() throws Exception {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
@@ -157,7 +161,7 @@ public class RecordAccumulatorTest {
@Test
public void testPartialDrain() throws Exception {
RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
int appends = 1024 / msgSize + 1;
List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
@@ -177,7 +181,7 @@ public class RecordAccumulatorTest {
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
@@ -222,7 +226,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
- CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
// Just short of going over the limit so we trigger linger time
int appends = expectedNumAppends(batchSize);
@@ -257,7 +261,7 @@ public class RecordAccumulatorTest {
long lingerMs = Long.MAX_VALUE / 4;
long retryBackoffMs = Long.MAX_VALUE / 2;
final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
long now = time.milliseconds();
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -295,7 +299,7 @@ public class RecordAccumulatorTest {
public void testFlush() throws Exception {
long lingerMs = Long.MAX_VALUE;
final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
- CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
for (int i = 0; i < 100; i++)
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -329,7 +333,7 @@ public class RecordAccumulatorTest {
@Test
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
- CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null);
accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
accum.beginFlush();
@@ -349,7 +353,7 @@ public class RecordAccumulatorTest {
long lingerMs = Long.MAX_VALUE;
final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
- CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -378,7 +382,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
- CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
int appends = expectedNumAppends(batchSize);
// Test batches not in retry
@@ -449,7 +453,7 @@ public class RecordAccumulatorTest {
int messagesPerBatch = expectedNumAppends(1024);
final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
- CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+ CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
final AtomicInteger expiryCallbackCount = new AtomicInteger();
final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
Callback callback = new Callback() {
@@ -490,7 +494,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
- CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions());
+ CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -519,6 +523,18 @@ public class RecordAccumulatorTest {
assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
}
+ @Test(expected = UnsupportedVersionException.class)
+ public void testIdempotenceWithOldMagic() throws InterruptedException {
+ // Simulate talking to an older broker, ie. one which supports a lower magic.
+ ApiVersions apiVersions = new ApiVersions();
+ int batchSize = 1025;
+ apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
+ (short) 0, (short) 2))));
+ RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+ CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionState(time));
+ accum.append(tp1, 0L, key, value, null, 0);
+ }
+
/**
* Return the offset delta.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0dea6b6..0d19aa0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
@@ -32,11 +33,14 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.InitPidRequest;
+import org.apache.kafka.common.requests.InitPidResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
@@ -46,6 +50,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -81,24 +86,7 @@ public class SenderTest {
@Before
public void setup() {
- Map<String, String> metricTags = new LinkedHashMap<>();
- metricTags.put("client-id", CLIENT_ID);
- MetricConfig metricConfig = new MetricConfig().tags(metricTags);
- metrics = new Metrics(metricConfig, time);
- accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions);
- sender = new Sender(client,
- metadata,
- this.accumulator,
- true,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- MAX_RETRIES,
- metrics,
- time,
- REQUEST_TIMEOUT,
- apiVersions);
-
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ setupWithTransactionState(null);
}
@After
@@ -244,16 +232,19 @@ public class SenderTest {
Metrics m = new Metrics();
try {
Sender sender = new Sender(client,
- metadata,
- this.accumulator,
- false,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- maxRetries,
- m,
- time,
- REQUEST_TIMEOUT,
- apiVersions);
+ metadata,
+ this.accumulator,
+ false,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 50,
+ null,
+ apiVersions
+ );
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
@@ -300,17 +291,19 @@ public class SenderTest {
Metrics m = new Metrics();
try {
Sender sender = new Sender(client,
- metadata,
- this.accumulator,
- true,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- maxRetries,
- m,
- time,
- REQUEST_TIMEOUT,
- apiVersions);
-
+ metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 50,
+ null,
+ apiVersions
+ );
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -381,6 +374,164 @@ public class SenderTest {
assertTrue("Request should be completed", future.isDone());
}
+ @Test
+ public void testInitPidRequest() throws Exception {
+ final long producerId = 343434L;
+ TransactionState transactionState = new TransactionState(new MockTime());
+ setupWithTransactionState(transactionState);
+ client.setNode(new Node(1, "localhost", 33343));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ return body instanceof InitPidRequest;
+ }
+ }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+ sender.run(time.milliseconds());
+ assertTrue(transactionState.hasPid());
+ assertEquals(producerId, transactionState.pidAndEpoch().producerId);
+ assertEquals((short) 0, transactionState.pidAndEpoch().epoch);
+ }
+
+ @Test
+ public void testSequenceNumberIncrement() throws InterruptedException {
+ final long producerId = 343434L;
+ TransactionState transactionState = new TransactionState(new MockTime());
+ transactionState.setPidAndEpoch(producerId, (short) 0);
+ setupWithTransactionState(transactionState);
+ client.setNode(new Node(1, "localhost", 33343));
+
+ int maxRetries = 10;
+ Metrics m = new Metrics();
+ Sender sender = new Sender(client,
+ metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 50,
+ transactionState,
+ apiVersions
+ );
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ if (body instanceof ProduceRequest) {
+ ProduceRequest request = (ProduceRequest) body;
+ MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
+ Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+ assertTrue(batchIterator.hasNext());
+ RecordBatch batch = batchIterator.next();
+ assertFalse(batchIterator.hasNext());
+ assertEquals(0, batch.baseSequence());
+ assertEquals(producerId, batch.producerId());
+ assertEquals(0, batch.producerEpoch());
+ return true;
+ }
+ return false;
+ }
+ }, produceResponse(tp0, 0, Errors.NONE, 0));
+
+ sender.run(time.milliseconds()); // connect.
+ sender.run(time.milliseconds()); // send.
+
+ sender.run(time.milliseconds()); // receive response
+ assertTrue(responseFuture.isDone());
+ assertEquals((long) transactionState.sequenceNumber(tp0), 1L);
+ }
+
+ @Test
+ public void testAbortRetryWhenPidChanges() throws InterruptedException {
+ final long producerId = 343434L;
+ TransactionState transactionState = new TransactionState(new MockTime());
+ transactionState.setPidAndEpoch(producerId, (short) 0);
+ setupWithTransactionState(transactionState);
+ client.setNode(new Node(1, "localhost", 33343));
+
+ int maxRetries = 10;
+ Metrics m = new Metrics();
+ Sender sender = new Sender(client,
+ metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 50,
+ transactionState,
+ apiVersions
+ );
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // connect.
+ sender.run(time.milliseconds()); // send.
+ String id = client.requests().peek().destination();
+ Node node = new Node(Integer.valueOf(id), "localhost", 0);
+ assertEquals(1, client.inFlightRequestCount());
+ assertTrue("Client ready status should be true", client.isReady(node, 0L));
+ client.disconnect(id);
+ assertEquals(0, client.inFlightRequestCount());
+ assertFalse("Client ready status should be false", client.isReady(node, 0L));
+
+ transactionState.setPidAndEpoch(producerId + 1, (short) 0);
+ sender.run(time.milliseconds()); // receive error
+ sender.run(time.milliseconds()); // reconnect
+ sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
+ assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
+
+ KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
+ assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
+
+ assertTrue(responseFuture.isDone());
+ assertEquals((long) transactionState.sequenceNumber(tp0), 0L);
+ }
+
+ @Test
+ public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
+ final long producerId = 343434L;
+ TransactionState transactionState = new TransactionState(new MockTime());
+ transactionState.setPidAndEpoch(producerId, (short) 0);
+ setupWithTransactionState(transactionState);
+ client.setNode(new Node(1, "localhost", 33343));
+
+ int maxRetries = 10;
+ Metrics m = new Metrics();
+ Sender sender = new Sender(client,
+ metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ maxRetries,
+ m,
+ time,
+ REQUEST_TIMEOUT,
+ 50,
+ transactionState,
+ apiVersions
+ );
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds()); // connect.
+ sender.run(time.milliseconds()); // send.
+
+ assertEquals(1, client.inFlightRequestCount());
+
+ client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
+
+ sender.run(time.milliseconds());
+ assertTrue(responseFuture.isDone());
+ assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionState.hasPid());
+ }
+
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
assertTrue("Request should be completed", future.isDone());
try {
@@ -397,4 +548,25 @@ public class SenderTest {
return new ProduceResponse(partResp, throttleTimeMs);
}
+ private void setupWithTransactionState(TransactionState transactionState) {
+ Map<String, String> metricTags = new LinkedHashMap<>();
+ metricTags.put("client-id", CLIENT_ID);
+ MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+ this.metrics = new Metrics(metricConfig, time);
+ this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionState);
+ this.sender = new Sender(this.client,
+ this.metadata,
+ this.accumulator,
+ true,
+ MAX_REQUEST_SIZE,
+ ACKS_ALL,
+ MAX_RETRIES,
+ this.metrics,
+ this.time,
+ REQUEST_TIMEOUT,
+ 50,
+ transactionState,
+ apiVersions);
+ this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
new file mode 100644
index 0000000..a8a1716
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionStateTest {
+
+ private TopicPartition topicPartition;
+
+ @Before
+ public void setUp() {
+ topicPartition = new TopicPartition("topic-0", 0);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInvalidSequenceIncrement() {
+ TransactionState transactionState = new TransactionState(new MockTime());
+ transactionState.incrementSequenceNumber(topicPartition, 3333);
+ }
+
+ @Test
+ public void testDefaultSequenceNumber() {
+ TransactionState transactionState = new TransactionState(new MockTime());
+ assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+ transactionState.incrementSequenceNumber(topicPartition, 3);
+ assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+ }
+
+
+ @Test
+ public void testProducerIdReset() {
+ TransactionState transactionState = new TransactionState(new MockTime());
+ assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+ transactionState.incrementSequenceNumber(topicPartition, 3);
+ assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+ transactionState.resetProducerId();
+ assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 2dd5ab0..a2c761f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -355,7 +355,7 @@ public class MemoryRecordsTest {
private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
@Override
- public boolean shouldRetain(Record record) {
+ public boolean shouldRetain(RecordBatch batch, Record record) {
return record.hasKey();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2024f90..8a7633e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -136,6 +136,9 @@ public class RequestResponseTest {
checkRequest(createDeleteTopicsRequest());
checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException());
checkResponse(createDeleteTopicsResponse(), 0);
+ checkRequest(createInitPidRequest());
+ checkErrorResponse(createInitPidRequest(), new UnknownServerException());
+ checkResponse(createInitPidResponse(), 0);
checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0);
checkResponse(createMetadataResponse(), 1);
@@ -787,6 +790,14 @@ public class RequestResponseTest {
return new DeleteTopicsResponse(errors);
}
+ private InitPidRequest createInitPidRequest() {
+ return new InitPidRequest.Builder(null).build();
+ }
+
+ private InitPidResponse createInitPidResponse() {
+ return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+ }
+
private static class ByteBufferChannel implements GatheringByteChannel {
private final ByteBuffer buf;
private boolean closed = false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
index 0a082fb..ce23a33 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -65,6 +65,16 @@ public class ByteUtilsTest {
}
@Test
+ public void testReadUnsignedInt() {
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+ long writeValue = 133444;
+ ByteUtils.writeUnsignedInt(buffer, writeValue);
+ buffer.flip();
+ long readValue = ByteUtils.readUnsignedInt(buffer);
+ assertEquals(writeValue, readValue);
+ }
+
+ @Test
public void testWriteUnsignedIntLEToArray() {
int value1 = 0x04030201;
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a2308b2..194cfcc 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,9 +24,9 @@ import kafka.cluster.Broker
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.server.KafkaConfig
import kafka.utils._
-import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
@@ -180,7 +180,6 @@ class RequestSendThread(val controllerId: Int,
def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
val QueueItem(apiKey, requestBuilder, callback) = queue.take()
- import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
@@ -196,7 +195,7 @@ class RequestSendThread(val controllerId: Int,
else {
val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
time.milliseconds(), true)
- clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
+ clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
isSendSuccessful = true
}
} catch {
@@ -233,10 +232,9 @@ class RequestSendThread(val controllerId: Int,
}
private def brokerReady(): Boolean = {
- import NetworkClientBlockingOps._
try {
- if (!networkClient.isReady(brokerNode)(time)) {
- if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
+ if (!NetworkClientUtils.isReady(networkClient, brokerNode, time.milliseconds())) {
+ if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 9d62924..2bc0c21 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -69,7 +69,7 @@ class GroupMetadataManager(val brokerId: Int,
private val shuttingDown = new AtomicBoolean(false)
/* number of partitions for the consumer metadata topic */
- private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
+ private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
/* single-thread scheduler to handle offset/group metadata cache loading and unloading */
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
@@ -667,16 +667,11 @@ class GroupMetadataManager(val brokerId: Int,
}
/**
- * Gets the partition count of the offsets topic from ZooKeeper.
+ * Gets the partition count of the group metadata topic from ZooKeeper.
* If the topic does not exist, the configured partition count is returned.
*/
- private def getOffsetsTopicPartitionCount = {
- val topic = Topic.GroupMetadataTopicName
- val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
- if (topicData(topic).nonEmpty)
- topicData(topic).size
- else
- config.offsetsTopicNumPartitions
+ private def getGroupMetadataTopicPartitionCount: Int = {
+ zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/PidMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PidMetadata.scala b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
new file mode 100644
index 0000000..fa58add
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+@nonthreadsafe
+private[coordinator] class PidMetadata(val pid: Long) {
+
+ /* current epoch number of the PID */
+ var epoch: Short = 0
+
+ override def equals(that: Any): Boolean = that match {
+ case other: PidMetadata => pid == other.pid && epoch == other.epoch
+ case _ => false
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
new file mode 100644
index 0000000..43b05a4
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import kafka.common.KafkaException
+import kafka.utils.{Json, Logging, ZkUtils}
+
+/**
+ * Pid manager is part of the transaction coordinator that provides PIDs in a unique way such that the same PID will not be
+ * assigned twice across multiple transaction coordinators.
+ *
+ * Pids are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
+ * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+ */
+object ProducerIdManager extends Logging {
+ val CurrentVersion: Long = 1L
+ val PidBlockSize: Long = 1000L
+
+ def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+ Json.encode(Map("version" -> CurrentVersion,
+ "broker" -> pidBlock.brokerId,
+ "block_start" -> pidBlock.blockStartPid.toString,
+ "block_end" -> pidBlock.blockEndPid.toString)
+ )
+ }
+
+ def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+ try {
+ Json.parseFull(jsonData).flatMap { m =>
+ val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
+ val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
+ val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
+ val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
+ Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
+ }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+ } catch {
+ case e: java.lang.NumberFormatException =>
+ // this should never happen: the written data has exceeded long type limit
+ fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+ throw e
+ }
+ }
+}
+
+case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+ override def toString: String = {
+ val pidBlockInfo = new StringBuilder
+ pidBlockInfo.append("(brokerId:" + brokerId)
+ pidBlockInfo.append(",blockStartPID:" + blockStartPid)
+ pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
+ pidBlockInfo.toString()
+ }
+}
+
+class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+
+ this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+ private var currentPIDBlock: ProducerIdBlock = null
+ private var nextPID: Long = -1L
+
+ // grab the first block of PIDs
+ this synchronized {
+ getNewPidBlock()
+ nextPID = currentPIDBlock.blockStartPid
+ }
+
+ private def getNewPidBlock(): Unit = {
+ var zkWriteComplete = false
+ while (!zkWriteComplete) {
+ // refresh current pid block from zookeeper again
+ val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+
+ // generate the new pid block
+ currentPIDBlock = dataOpt match {
+ case Some(data) =>
+ val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+ debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+
+ if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+ // we have exhausted all pids (wow!), treat it as a fatal error
+ fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
+ throw new KafkaException("Have exhausted all pids.")
+ }
+
+ ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+ case None =>
+ debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+ ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+ }
+
+ val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+
+ // try to write the new pid block into zookeeper
+ val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+ zkWriteComplete = succeeded
+
+ if (zkWriteComplete)
+ info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+ }
+ }
+
+ private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+ try {
+ val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
+ val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+ dataOpt match {
+ case Some(data) =>
+ val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+ (currPIDBlock.equals(expectedPidBlock), zkVersion)
+ case None =>
+ (false, -1)
+ }
+ } catch {
+ case e: Exception =>
+ warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
+
+ (false, -1)
+ }
+ }
+
+ def nextPid(): Long = {
+ this synchronized {
+ // grab a new block of PIDs if this block has been exhausted
+ if (nextPID > currentPIDBlock.blockEndPid) {
+ getNewPidBlock()
+ nextPID = currentPIDBlock.blockStartPid + 1
+ } else {
+ nextPID += 1
+ }
+
+ nextPID - 1
+ }
+ }
+
+ def shutdown() {
+ info(s"Shutdown complete: last PID assigned $nextPID")
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
new file mode 100644
index 0000000..41b4323
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, ZkUtils}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.utils.Time
+
+/**
+ * Transaction coordinator handles message transactions sent by producers and communicate with brokers
+ * to update ongoing transaction's status.
+ *
+ * Each Kafka server instantiates a transaction coordinator which is responsible for a set of
+ * producers. Producers with specific transactional ids are assigned to their corresponding coordinators;
+ * Producers with no specific transactional id may talk to a random broker as their coordinators.
+ */
+object TransactionCoordinator {
+
+ def apply(config: KafkaConfig, zkUtils: ZkUtils, time: Time): TransactionCoordinator = {
+ val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+ new TransactionCoordinator(config.brokerId, pidManager)
+ }
+}
+
+class TransactionCoordinator(val brokerId: Int,
+ val pidManager: ProducerIdManager) extends Logging {
+
+ this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+
+ type InitPidCallback = InitPidResult => Unit
+
+ /* Active flag of the coordinator */
+ private val isActive = new AtomicBoolean(false)
+
+ def handleInitPid(transactionalId: String,
+ responseCallback: InitPidCallback): Unit = {
+ if (transactionalId == null || transactionalId.isEmpty) {
+ // if the transactional id is not specified, then always blindly accept the request
+ // and return a new pid from the pid manager
+ val pid = pidManager.nextPid()
+ responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+ } else {
+ // check if it is the assigned coordinator for the transactional id
+ responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP))
+ }
+ }
+
+ /**
+ * Startup logic executed at the same time when the server starts up.
+ */
+ def startup() {
+ info("Starting up.")
+ isActive.set(true)
+ info("Startup complete.")
+ }
+
+ /**
+ * Shutdown logic executed at the same time when server shuts down.
+ * Ordering of actions should be reversed from the startup process.
+ */
+ def shutdown() {
+ info("Shutting down.")
+ isActive.set(false)
+ pidManager.shutdown()
+ info("Shutdown complete.")
+ }
+
+ private def initPidError(error: Errors): InitPidResult = {
+ InitPidResult(pid = RecordBatch.NO_PRODUCER_ID, epoch = RecordBatch.NO_PRODUCER_EPOCH, error)
+ }
+
+}
+
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 2a81f26..95a6896 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,30 +17,30 @@
package kafka.log
+import java.io.{File, IOException}
+import java.text.NumberFormat
+import java.util.concurrent.atomic._
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_10_0_IV0
-import kafka.utils._
import kafka.common._
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
-import java.io.{File, IOException}
-import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
-import java.util.concurrent.atomic._
-import java.text.NumberFormat
-
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.utils.{Time, Utils}
-import scala.collection.Seq
import scala.collection.JavaConverters._
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.{Time, Utils}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
-import org.apache.kafka.common.TopicPartition
+import scala.collection.{Seq, mutable}
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false)
}
/**
@@ -56,6 +56,9 @@ object LogAppendInfo {
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a
+ * RecordBatch and keep track of metadata across Records in a RecordBatch.
+ * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log.
*/
case class LogAppendInfo(var firstOffset: Long,
var lastOffset: Long,
@@ -66,8 +69,9 @@ case class LogAppendInfo(var firstOffset: Long,
targetCodec: CompressionCodec,
shallowCount: Int,
validBytes: Int,
- offsetsMonotonic: Boolean)
-
+ offsetsMonotonic: Boolean,
+ producerAppendInfos: Map[Long, ProducerAppendInfo],
+ isDuplicate: Boolean = false)
/**
* An append-only log for storing messages.
@@ -93,7 +97,8 @@ case class LogAppendInfo(var firstOffset: Long,
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
- *
+ * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired
+ * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired
*/
@threadsafe
class Log(@volatile var dir: File,
@@ -101,7 +106,10 @@ class Log(@volatile var dir: File,
@volatile var logStartOffset: Long = 0L,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
- time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
+ time: Time = Time.SYSTEM,
+ val maxPidExpirationMs: Int = 60 * 60 * 1000,
+ val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
+ val pidSnapshotCreationIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -118,10 +126,16 @@ class Log(@volatile var dir: File,
0
}
+ val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
+
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
+ /* Construct and load PID map */
+ private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs)
+
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+
locally {
val startMs = time.milliseconds
@@ -131,13 +145,12 @@ class Log(@volatile var dir: File,
activeSegment.size.toInt)
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+ buildAndRecoverPidMap(logEndOffset)
info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
.format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
}
- val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
-
private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
newGauge("NumLogSegments",
@@ -164,6 +177,19 @@ class Log(@volatile var dir: File,
},
tags)
+ scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => {
+ lock synchronized {
+ pidMap.checkForExpiredPids(time.milliseconds)
+ }
+ }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+ scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => {
+ lock synchronized {
+ pidMap.maybeTakeSnapshot()
+ }
+ }, period = pidSnapshotCreationIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+
/** The name of this log */
def name = dir.getName()
@@ -332,6 +358,47 @@ class Log(@volatile var dir: File,
}
/**
+ * Creates an instance of id map for this log and updates the mapping
+ * in the case it is missing some messages. Note that the id mapping
+ * starts from a snapshot that is taken strictly before the log end
+ * offset. Consequently, we need to process the tail of the log to update
+ * the mapping.
+ *
+ * @param lastOffset
+ *
+ * @return An instance of ProducerIdMapping
+ */
+ private def buildAndRecoverPidMap(lastOffset: Long) {
+ lock synchronized {
+ val currentTimeMs = time.milliseconds
+ pidMap.truncateAndReload(lastOffset, currentTimeMs)
+ logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment =>
+ val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset)
+ val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
+ val records = fetchDataInfo.records
+ records.batches.asScala.foreach { batch =>
+ if (batch.hasProducerId) {
+ // TODO: Currently accessing any of the batch-level headers other than the offset
+ // or magic causes us to load the full entry into memory. It would be better if we
+ // only loaded the header
+ val numRecords = (batch.lastOffset - batch.baseOffset + 1).toInt
+ val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset,
+ numRecords, batch.maxTimestamp)
+ pidMap.load(batch.producerId, pidEntry, currentTimeMs)
+ }
+ }
+ }
+ pidMap.cleanFrom(logStartOffset)
+ }
+ }
+
+ private[log] def activePids: Map[Long, ProducerIdEntry] = {
+ lock synchronized {
+ pidMap.activePids
+ }
+ }
+
+ /**
* Check if we have the "clean shutdown" file
*/
private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
@@ -364,10 +431,11 @@ class Log(@volatile var dir: File,
* @return Information about the appended messages including the first and last offset.
*/
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
- val appendInfo = analyzeAndValidateRecords(records)
- // if we have any valid messages, append them to the log
- if (appendInfo.shallowCount == 0)
+ val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
+
+ // return if we have no valid messages or if this is a duplicate of the last appended entry
+ if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -433,7 +501,6 @@ class Log(@volatile var dir: File,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)
-
// now append to the log
segment.append(firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
@@ -441,6 +508,15 @@ class Log(@volatile var dir: File,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
+ // update the PID sequence mapping
+ for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) {
+ trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}")
+
+ if (assignOffsets)
+ producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp)
+ pidMap.update(producerAppendInfo)
+ }
+
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
@@ -457,7 +533,7 @@ class Log(@volatile var dir: File,
}
}
- /*
+ /**
* Increment the log start offset if the provided offset is larger.
*/
def maybeIncrementLogStartOffset(offset: Long) {
@@ -476,6 +552,7 @@ class Log(@volatile var dir: File,
* <ol>
* <li> each message matches its CRC
* <li> each message size is valid
+ * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other.
* </ol>
*
* Also compute the following quantities:
@@ -488,7 +565,7 @@ class Log(@volatile var dir: File,
* <li> Whether any compression codec is used (if many are used, then the last one is given)
* </ol>
*/
- private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
+ private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset = -1L
@@ -497,8 +574,12 @@ class Log(@volatile var dir: File,
var monotonic = true
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
+ var isDuplicate = false
+ val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
for (batch <- records.batches.asScala) {
+ if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0)
+ throw new InvalidRecordException("Client produce requests should not have more than one batch")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
// For magic version 2, we can get the first offset directly from the batch header.
@@ -508,6 +589,7 @@ class Log(@volatile var dir: File,
// check that offsets are monotonically increasing
if (lastOffset >= batch.lastOffset)
monotonic = false
+
// update the last offset seen
lastOffset = batch.lastOffset
@@ -527,19 +609,43 @@ class Log(@volatile var dir: File,
maxTimestamp = batch.maxTimestamp
offsetOfMaxTimestamp = lastOffset
}
+
shallowMessageCount += 1
validBytesCount += batchSize
val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
+
+ val pid = batch.producerId
+ if (pid != RecordBatch.NO_PRODUCER_ID) {
+ producerAppendInfos.get(pid) match {
+ case Some(appendInfo) => appendInfo.append(batch)
+ case None =>
+ val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)
+ if (isFromClient && lastEntry.isDuplicate(batch)) {
+ // This request is a duplicate so return the information about the existing entry. Note that for requests
+ // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration
+ // of the loop and the values below will not be updated more than once.
+ isDuplicate = true
+ firstOffset = lastEntry.firstOffset
+ lastOffset = lastEntry.lastOffset
+ maxTimestamp = lastEntry.timestamp
+ info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.")
+ } else {
+ val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
+ producerAppendInfos.put(pid, producerAppendInfo)
+ producerAppendInfo.append(batch)
+ }
+ }
+ }
}
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
- targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate)
}
/**
@@ -941,6 +1047,7 @@ class Log(@volatile var dir: File,
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
}
+ buildAndRecoverPidMap(targetOffset)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8ddeca9..830f906 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, RecordBatch}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -219,8 +219,7 @@ class LogCleaner(val config: CleanerConfig,
override def doWork() {
cleanOrSleep()
}
-
-
+
override def shutdown() = {
initiateShutdown()
backOffWaitLatch.countDown()
@@ -402,7 +401,7 @@ private[log] class Cleaner(val id: Int,
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
- cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
+ cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, log.activePids, stats)
}
// trim excess index
@@ -449,9 +448,10 @@ private[log] class Cleaner(val id: Int,
map: OffsetMap,
retainDeletes: Boolean,
maxLogMessageSize: Int,
+ activePids: Map[Long, ProducerIdEntry],
stats: CleanerStats) {
val logCleanerFilter = new RecordFilter {
- def shouldRetain(record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats)
+ def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId)
}
var position = 0
@@ -493,10 +493,17 @@ private[log] class Cleaner(val id: Int,
map: kafka.log.OffsetMap,
retainDeletes: Boolean,
record: Record,
- stats: CleanerStats): Boolean = {
+ stats: CleanerStats,
+ activePids: Map[Long, ProducerIdEntry],
+ pid: Long): Boolean = {
if (record.isControlRecord)
return true
+ // retain the entry if it is the last one produced by an active idempotent producer to ensure that
+ // the PID is not removed from the log before it has been expired
+ if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
+ return true
+
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
return true
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 55669c0..30bc26b 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -56,6 +56,7 @@ object Defaults {
val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
+ val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
}
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a555420..ec164e2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,9 +25,10 @@ import kafka.utils._
import scala.collection._
import scala.collection.JavaConverters._
import kafka.common.{KafkaException, KafkaStorageException}
-import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
+import kafka.server._
import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+import kafka.admin.AdminUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
@@ -51,6 +52,7 @@ class LogManager(val logDirs: Array[File],
val flushRecoveryOffsetCheckpointMs: Long,
val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
+ val maxPidExpirationMs: Int,
scheduler: Scheduler,
val brokerState: BrokerState,
time: Time) extends Logging {
@@ -166,7 +168,14 @@ class LogManager(val logDirs: Array[File],
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
- val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
+ val current = new Log(
+ dir = logDir,
+ config = config,
+ logStartOffset = logStartOffset,
+ recoveryPoint = logRecoveryPoint,
+ maxPidExpirationMs = maxPidExpirationMs,
+ scheduler = scheduler,
+ time = time)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
} else {
@@ -401,7 +410,15 @@ class LogManager(val logDirs: Array[File],
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
- val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
+
+ val log = new Log(
+ dir = dir,
+ config = config,
+ logStartOffset = 0L,
+ recoveryPoint = 0L,
+ maxPidExpirationMs = maxPidExpirationMs,
+ scheduler = scheduler,
+ time = time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
@@ -493,7 +510,7 @@ class LogManager(val logDirs: Array[File],
// count the number of logs in each parent directory (including 0 for empty directories
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
- var dirCounts = (zeros ++ logCounts).toBuffer
+ val dirCounts = (zeros ++ logCounts).toBuffer
// choose the directory with the least logs in it
val leastLoaded = dirCounts.sortBy(_._2).head
@@ -556,3 +573,42 @@ class LogManager(val logDirs: Array[File],
}
}
}
+
+object LogManager {
+ def apply(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ brokerState: BrokerState,
+ kafkaScheduler: KafkaScheduler,
+ time: Time): LogManager = {
+ val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
+ val defaultLogConfig = LogConfig(defaultProps)
+
+ val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
+ topic -> LogConfig.fromProps(defaultProps, configs)
+ }
+
+ // read the log configurations from zookeeper
+ val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
+ dedupeBufferSize = config.logCleanerDedupeBufferSize,
+ dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+ ioBufferSize = config.logCleanerIoBufferSize,
+ maxMessageSize = config.messageMaxBytes,
+ maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+ backOffMs = config.logCleanerBackoffMs,
+ enableCleaner = config.logCleanerEnable)
+
+ new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+ topicConfigs = topicConfigs,
+ defaultConfig = defaultLogConfig,
+ cleanerConfig = cleanerConfig,
+ ioThreads = config.numRecoveryThreadsPerDataDir,
+ flushCheckMs = config.logFlushSchedulerIntervalMs,
+ flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+ flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
+ retentionCheckMs = config.logCleanupIntervalMs,
+ maxPidExpirationMs = config.transactionIdExpirationMs,
+ scheduler = kafkaScheduler,
+ brokerState = brokerState,
+ time = time)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 94e3608..c01a5de 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -62,6 +62,7 @@ private[kafka] object LogValidator extends Logging {
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
messageTimestampDiffMaxMs)
} else {
+
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
}
@@ -214,8 +215,15 @@ private[kafka] object LogValidator extends Logging {
}
if (!inPlaceAssignment) {
+ val (pid, epoch, sequence) = {
+ // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2,
+ // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+ // with older magic versions, this will always be NO_PRODUCER_ID, etc.
+ val first = records.batches.asScala.head
+ (first.producerId, first.producerEpoch, first.baseSequence)
+ }
buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
- CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords)
+ CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -238,10 +246,12 @@ private[kafka] object LogValidator extends Logging {
private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
compressionType: CompressionType, logAppendTime: Long,
- validatedRecords: Seq[Record]): ValidationAndOffsetAssignResult = {
+ validatedRecords: Seq[Record],
+ producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = {
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
- val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, logAppendTime)
+ val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
+ logAppendTime, producerId, epoch, baseSequence)
validatedRecords.foreach { record =>
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)