You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:15 UTC
[6/6] kafka git commit: KAFKA-5121;
Implement transaction index for KIP-98
KAFKA-5121; Implement transaction index for KIP-98
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #2910 from hachikuji/eos-txn-index
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e71dce89
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e71dce89
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e71dce89
Branch: refs/heads/trunk
Commit: e71dce89c0da50f3eccc47d0fc050c92d5a99b88
Parents: 29994dd
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat May 6 11:49:35 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat May 6 11:49:35 2017 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 79 +--
.../TransactionCoordinatorFencedException.java | 30 +
.../apache/kafka/common/protocol/Errors.java | 17 +-
.../apache/kafka/common/protocol/Protocol.java | 17 +-
.../record/AbstractLegacyRecordBatch.java | 2 +-
.../kafka/common/record/AbstractRecords.java | 9 +-
.../kafka/common/record/ControlRecordType.java | 24 +-
.../kafka/common/record/DefaultRecord.java | 50 +-
.../kafka/common/record/DefaultRecordBatch.java | 22 +-
.../common/record/EndTransactionMarker.java | 124 ++++
.../kafka/common/record/FileLogInputStream.java | 6 +
.../apache/kafka/common/record/FileRecords.java | 13 +-
.../kafka/common/record/MemoryRecords.java | 109 +++-
.../common/record/MemoryRecordsBuilder.java | 66 +-
.../org/apache/kafka/common/record/Record.java | 9 -
.../apache/kafka/common/record/RecordBatch.java | 8 +
.../common/requests/ListOffsetRequest.java | 34 +-
.../org/apache/kafka/common/utils/Utils.java | 5 +
.../clients/consumer/internals/FetcherTest.java | 97 +--
.../common/record/DefaultRecordBatchTest.java | 31 +-
.../kafka/common/record/DefaultRecordTest.java | 45 +-
.../common/record/EndTransactionMarkerTest.java | 70 +++
.../kafka/common/record/FileRecordsTest.java | 11 +-
.../common/record/MemoryRecordsBuilderTest.java | 93 ++-
.../kafka/common/record/MemoryRecordsTest.java | 80 ++-
.../common/requests/RequestResponseTest.java | 22 +-
.../main/scala/kafka/cluster/Partition.scala | 4 +-
core/src/main/scala/kafka/cluster/Replica.scala | 23 +-
.../group/GroupMetadataManager.scala | 27 +-
.../transaction/TransactionStateManager.scala | 19 +-
.../main/scala/kafka/log/AbstractIndex.scala | 23 +-
core/src/main/scala/kafka/log/Log.scala | 611 ++++++++++++-------
core/src/main/scala/kafka/log/LogCleaner.scala | 34 +-
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
core/src/main/scala/kafka/log/LogSegment.scala | 132 ++--
.../src/main/scala/kafka/log/LogValidator.scala | 83 +--
core/src/main/scala/kafka/log/OffsetIndex.scala | 20 +-
.../scala/kafka/log/ProducerIdMapping.scala | 384 ------------
.../scala/kafka/log/ProducerStateManager.scala | 590 ++++++++++++++++++
core/src/main/scala/kafka/log/TimeIndex.scala | 7 +-
.../main/scala/kafka/log/TransactionIndex.scala | 243 ++++++++
.../main/scala/kafka/server/DelayedFetch.scala | 14 +-
.../main/scala/kafka/server/FetchDataInfo.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 67 +-
.../scala/kafka/server/LogOffsetMetadata.scala | 8 +-
.../scala/kafka/server/ReplicaManager.scala | 31 +-
.../scala/kafka/tools/DumpLogSegments.scala | 29 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../group/GroupCoordinatorResponseTest.scala | 6 +-
.../group/GroupMetadataManagerTest.scala | 22 +-
.../TransactionStateManagerTest.scala | 7 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 2 +-
.../log/LogCleanerLagIntegrationTest.scala | 3 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 5 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 104 +++-
.../src/test/scala/unit/kafka/log/LogTest.scala | 555 +++++++++++++++--
.../scala/unit/kafka/log/LogValidatorTest.scala | 209 +++++--
.../scala/unit/kafka/log/OffsetIndexTest.scala | 24 +-
.../unit/kafka/log/ProducerIdMappingTest.scala | 291 ---------
.../kafka/log/ProducerStateManagerTest.scala | 562 +++++++++++++++++
.../unit/kafka/log/TransactionIndexTest.scala | 173 ++++++
.../scala/unit/kafka/server/LogOffsetTest.scala | 12 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 7 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 151 ++++-
.../unit/kafka/server/RequestQuotaTest.scala | 7 +-
.../unit/kafka/server/SimpleFetchTest.scala | 7 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
69 files changed, 4099 insertions(+), 1488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0c5c385..dc6c338 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -669,7 +668,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
- ListOffsetRequest.Builder builder = ListOffsetRequest.Builder.forConsumer(requireTimestamp)
+ ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
+ .forConsumer(requireTimestamp, isolationLevel)
.setTargetTimes(timestampsToSearch);
log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
@@ -1003,12 +1003,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
return null;
}
currentBatch = batches.next();
-
maybeEnsureValid(currentBatch);
- if (isolationLevel == IsolationLevel.READ_COMMITTED && isBatchAborted(currentBatch)) {
- nextFetchOffset = currentBatch.lastOffset() + 1;
- continue;
+ if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
+ long producerId = currentBatch.producerId();
+ if (containsAbortMarker(currentBatch)) {
+ abortedProducerIds.remove(producerId);
+ } else if (isBatchAborted(currentBatch)) {
+ log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}",
+ producerId, currentBatch.baseOffset(), partition);
+ nextFetchOffset = currentBatch.lastOffset() + 1;
+ continue;
+ }
}
records = currentBatch.streamingIterator();
@@ -1022,7 +1028,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
nextFetchOffset = record.offset() + 1;
// control records are not returned to the user
- if (!record.isControlRecord())
+ if (!currentBatch.isControlBatch())
return record;
}
}
@@ -1046,7 +1052,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
private boolean isBatchAborted(RecordBatch batch) {
- /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
+ /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
* 0. Check whether the pid is in the 'abortedProducerIds' set && the entry does not include an abort marker.
* If so, skip the entry.
* 1. If the pid is in aborted pids and the entry contains an abort marker, remove the pid from
@@ -1056,47 +1062,48 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
* this means that the entry has been aborted. Add the pid to the aborted pids set, and remove
* the entry from the abort index.
*/
- FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
- if (abortedProducerIds.contains(batch.producerId())
- || (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset())) {
- if (abortedProducerIds.contains(batch.producerId()) && containsAbortMarker(batch)) {
- abortedProducerIds.remove(batch.producerId());
- } else if (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
- abortedProducerIds.add(batch.producerId());
+ long producerId = batch.producerId();
+ if (abortedProducerIds.contains(producerId)) {
+ return true;
+ } else if (abortedTransactions != null && !abortedTransactions.isEmpty()) {
+ FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
+ if (nextAbortedTransaction.producerId == producerId && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
+ abortedProducerIds.add(producerId);
abortedTransactions.poll();
+ return true;
}
- log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", batch.producerId(), batch.baseOffset(), partition);
- return true;
}
return false;
}
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
- PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = null;
- if (partition.abortedTransactions != null && !partition.abortedTransactions.isEmpty()) {
- abortedTransactions = new PriorityQueue<>(
- partition.abortedTransactions.size(),
- new Comparator<FetchResponse.AbortedTransaction>() {
- @Override
- public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
- return Long.compare(o1.firstOffset, o2.firstOffset);
- }
+ if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
+ return null;
+
+ PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
+ partition.abortedTransactions.size(),
+ new Comparator<FetchResponse.AbortedTransaction>() {
+ @Override
+ public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
+ return Long.compare(o1.firstOffset, o2.firstOffset);
}
- );
- abortedTransactions.addAll(partition.abortedTransactions);
- } else {
- abortedTransactions = new PriorityQueue<>();
- }
+ }
+ );
+ abortedTransactions.addAll(partition.abortedTransactions);
return abortedTransactions;
}
private boolean containsAbortMarker(RecordBatch batch) {
+ if (!batch.isControlBatch())
+ return false;
+
Iterator<Record> batchIterator = batch.iterator();
- Record firstRecord = batchIterator.hasNext() ? batchIterator.next() : null;
- boolean containsAbortMarker = firstRecord != null && firstRecord.isControlRecord() && ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
- if (containsAbortMarker && batchIterator.hasNext())
- throw new CorruptRecordException("A record batch containing a control message contained more than one record. partition: " + partition + ", offset: " + batch.baseOffset());
- return containsAbortMarker;
+ if (!batchIterator.hasNext())
+ throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " +
+ batch.baseOffset() + " with control sequence set, but no records");
+
+ Record firstRecord = batchIterator.next();
+ return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
new file mode 100644
index 0000000..583ce04
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class TransactionCoordinatorFencedException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TransactionCoordinatorFencedException(String message) {
+ super(message);
+ }
+
+ public TransactionCoordinatorFencedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 65bec4a..960fdda 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -21,11 +21,11 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
import org.apache.kafka.common.errors.ControllerMovedException;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -39,12 +39,11 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
-import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
@@ -54,6 +53,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -64,6 +64,7 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -460,6 +461,14 @@ public enum Errors {
public ApiException build(String message) {
return new ConcurrentTransactionsException(message);
}
+ }),
+ TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction coordinator sending a WriteTxnMarker " +
+ "is no longer the current coordinator for a given producer",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new TransactionCoordinatorFencedException(message);
+ }
});
private interface ApiExceptionBuilder {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 16ec9ea..fb3c8c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -467,8 +467,21 @@ public class Protocol {
new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
"Topics to list offsets."));
- /* v2 request is the same as v1. Throttle time has been added to response */
- public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1;
+ public static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
+ new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("isolation_level",
+ INT8,
+ "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+ "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+ "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+ "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+ "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+ "consumers to discard ABORTED transactional records"),
+ new Field("topics",
+ new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
+ "Topics to list offsets."));;
public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 85fcb2a..7be4bdd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -206,7 +206,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
@Override
- public boolean isControlRecord() {
+ public boolean isControlBatch() {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 87df7e4..cfda8a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -56,6 +56,9 @@ public abstract class AbstractRecords implements Records {
int totalSizeEstimate = 0;
for (RecordBatch batch : batches) {
+ if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
+ continue;
+
if (batch.magic() <= toMagic) {
totalSizeEstimate += batch.sizeInBytes();
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
@@ -94,12 +97,8 @@ public abstract class AbstractRecords implements Records {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
- for (Record record : recordBatchAndRecords.records) {
- // control messages are only supported in v2 and above, so skip when down-converting
- if (magic < RecordBatch.MAGIC_VALUE_V2 && record.isControlRecord())
- continue;
+ for (Record record : recordBatchAndRecords.records)
builder.append(record);
- }
builder.close();
return builder.buffer();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
index 790b2ee..d5ead14 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -49,6 +49,7 @@ public enum ControlRecordType {
private static final Logger log = LoggerFactory.getLogger(ControlRecordType.class);
static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
+ static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
new Field("version", Type.INT16),
new Field("type", Type.INT16));
@@ -69,13 +70,24 @@ public enum ControlRecordType {
return struct;
}
- public static ControlRecordType parse(ByteBuffer key) {
+ public static short parseTypeId(ByteBuffer key) {
+ if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE)
+ throw new InvalidRecordException("Invalid value size found for end control record key. Must have " +
+ "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes, but found only " + key.remaining());
+
short version = key.getShort(0);
+ if (version < 0)
+ throw new InvalidRecordException("Invalid version found for control record: " + version +
+ ". May indicate data corruption");
+
if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
- log.debug("Received unknown control record key version {}. Parsing as version {}", version,
+ log.debug("Received unknown control record key version {}. Parsing as version {}", version,
CURRENT_CONTROL_RECORD_KEY_VERSION);
- short type = key.getShort(2);
- switch (type) {
+ return key.getShort(2);
+ }
+
+ public static ControlRecordType fromTypeId(short typeId) {
+ switch (typeId) {
case 0:
return ABORT;
case 1:
@@ -84,4 +96,8 @@ public enum ControlRecordType {
return UNKNOWN;
}
}
+
+ public static ControlRecordType parse(ByteBuffer key) {
+ return fromTypeId(parseTypeId(key));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index e0794d8..669c75d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -55,11 +55,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
*
* The current record attributes are depicted below:
*
- * -----------------------------------
- * | Unused (1-7) | Control Flag (0) |
- * -----------------------------------
- *
- * The control flag is used to implement control records (see {@link ControlRecordType}).
+ * ----------------
+ * | Unused (0-7) |
+ * ----------------
*
* The offset and timestamp deltas compute the difference relative to the base offset and
* base timestamp of the log entry that this record is contained in.
@@ -69,7 +67,6 @@ public class DefaultRecord implements Record {
// excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
public static final int MAX_RECORD_OVERHEAD = 21;
- private static final int CONTROL_FLAG_MASK = 0x01;
private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);
private final int sizeInBytes;
@@ -180,7 +177,6 @@ public class DefaultRecord implements Record {
* Write the record to `out` and return its crc.
*/
public static long writeTo(DataOutputStream out,
- boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
@@ -189,7 +185,7 @@ public class DefaultRecord implements Record {
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
- byte attributes = computeAttributes(isControlRecord);
+ byte attributes = 0; // there are no used record attributes at the moment
out.write(attributes);
ByteUtils.writeVarlong(timestampDelta, out);
@@ -241,15 +237,14 @@ public class DefaultRecord implements Record {
* Write the record to `out` and return its crc.
*/
public static long writeTo(ByteBuffer out,
- boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
try {
- return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
- timestampDelta, key, value, headers);
+ return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
+ key, value, headers);
} catch (IOException e) {
// cannot actually be raised by ByteBufferOutputStream
throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
@@ -290,11 +285,6 @@ public class DefaultRecord implements Record {
}
@Override
- public boolean isControlRecord() {
- return (attributes & CONTROL_FLAG_MASK) != 0;
- }
-
- @Override
public String toString() {
return String.format("DefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)",
offset,
@@ -421,10 +411,6 @@ public class DefaultRecord implements Record {
return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
}
- private static byte computeAttributes(boolean isControlRecord) {
- return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
- }
-
public static int sizeInBytes(int offsetDelta,
long timestampDelta,
byte[] key,
@@ -441,19 +427,35 @@ public class DefaultRecord implements Record {
return bodySize + ByteUtils.sizeOfVarint(bodySize);
}
+ public static int sizeInBytes(int offsetDelta,
+ long timestampDelta,
+ int keySize,
+ int valueSize,
+ Header[] headers) {
+ int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
+ return bodySize + ByteUtils.sizeOfVarint(bodySize);
+ }
+
private static int sizeOfBodyInBytes(int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
- int size = 1; // always one byte for attributes
- size += ByteUtils.sizeOfVarint(offsetDelta);
- size += ByteUtils.sizeOfVarlong(timestampDelta);
int keySize = key == null ? -1 : key.remaining();
int valueSize = value == null ? -1 : value.remaining();
- size += sizeOf(keySize, valueSize, headers);
+ return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
+ }
+ private static int sizeOfBodyInBytes(int offsetDelta,
+ long timestampDelta,
+ int keySize,
+ int valueSize,
+ Header[] headers) {
+ int size = 1; // always one byte for attributes
+ size += ByteUtils.sizeOfVarint(offsetDelta);
+ size += ByteUtils.sizeOfVarlong(timestampDelta);
+ size += sizeOf(keySize, valueSize, headers);
return size;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 93cd2eb..f321c3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -62,9 +62,9 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
*
* The current attributes are given below:
*
- * -----------------------------------------------------------------------------------
- * | Unused (5-15) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
- * -----------------------------------------------------------------------------------
+ * -------------------------------------------------------------------------------------------------
+ * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ * -------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
@@ -98,6 +98,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
+ private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private final ByteBuffer buffer;
@@ -203,6 +204,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
@Override
+ public boolean isControlBatch() {
+ return (attributes() & CONTROL_FLAG_MASK) > 0;
+ }
+
+ @Override
public int partitionLeaderEpoch() {
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}
@@ -284,7 +290,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;
- byte attributes = computeAttributes(compressionType(), timestampType, isTransactional());
+ byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
@@ -330,12 +336,15 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return buffer != null ? buffer.hashCode() : 0;
}
- private static byte computeAttributes(CompressionType type, TimestampType timestampType, boolean isTransactional) {
+ private static byte computeAttributes(CompressionType type, TimestampType timestampType,
+ boolean isTransactional, boolean isControl) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
byte attributes = isTransactional ? TRANSACTIONAL_FLAG_MASK : 0;
+ if (isControl)
+ attributes |= CONTROL_FLAG_MASK;
if (type.id > 0)
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
@@ -356,6 +365,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
short epoch,
int sequence,
boolean isTransactional,
+ boolean isControlBatch,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@@ -363,7 +373,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);
- short attributes = computeAttributes(compressionType, timestampType, isTransactional);
+ short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
new file mode 100644
index 0000000..726b52a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents the control record which is written to the log to indicate the completion
+ * of a transaction. The record key specifies the {@link ControlRecordType control type} and the
+ * value embeds information useful for write validation (for now, just the coordinator epoch).
+ */
+public class EndTransactionMarker {
+ private static final Logger log = LoggerFactory.getLogger(EndTransactionMarker.class);
+
+ private static final short CURRENT_END_TXN_MARKER_VERSION = 0;
+ private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema(
+ new Field("version", Type.INT16),
+ new Field("coordinator_epoch", Type.INT32));
+ static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6;
+ static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = DefaultRecord.sizeInBytes(0, 0L,
+ ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+ EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE,
+ Record.EMPTY_HEADERS);
+
+ private final ControlRecordType type;
+ private final int coordinatorEpoch;
+
+ public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) {
+ ensureTransactionMarkerControlType(type);
+ this.type = type;
+ this.coordinatorEpoch = coordinatorEpoch;
+ }
+
+ public int coordinatorEpoch() {
+ return coordinatorEpoch;
+ }
+
+ public ControlRecordType controlType() {
+ return type;
+ }
+
+ private Struct buildRecordValue() {
+ Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0);
+ struct.set("version", CURRENT_END_TXN_MARKER_VERSION);
+ struct.set("coordinator_epoch", coordinatorEpoch);
+ return struct;
+ }
+
+ public ByteBuffer serializeValue() {
+ Struct valueStruct = buildRecordValue();
+ ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf());
+ valueStruct.writeTo(value);
+ value.flip();
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ EndTransactionMarker that = (EndTransactionMarker) o;
+ return coordinatorEpoch == that.coordinatorEpoch && type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + coordinatorEpoch;
+ return result;
+ }
+
+ private static void ensureTransactionMarkerControlType(ControlRecordType type) {
+ if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT)
+ throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type);
+ }
+
+ public static EndTransactionMarker deserialize(Record record) {
+ ControlRecordType type = ControlRecordType.parse(record.key());
+ return deserializeValue(type, record.value());
+ }
+
+ static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) {
+ ensureTransactionMarkerControlType(type);
+
+ if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE)
+ throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " +
+ "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining());
+
+ short version = value.getShort(0);
+ if (version < 0)
+ throw new InvalidRecordException("Invalid version found for end transaction marker: " + version +
+ ". May indicate data corruption");
+
+ if (version > CURRENT_END_TXN_MARKER_VERSION)
+ log.debug("Received end transaction marker value version {}. Parsing as version {}", version,
+ CURRENT_END_TXN_MARKER_VERSION);
+
+ int coordinatorEpoch = value.getInt(2);
+ return new EndTransactionMarker(type, coordinatorEpoch);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index d5f10dc..1af5527 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -279,6 +279,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
@Override
+ public boolean isControlBatch() {
+ loadUnderlyingRecordBatch();
+ return underlying.isControlBatch();
+ }
+
+ @Override
public int partitionLeaderEpoch() {
loadUnderlyingRecordBatch();
return underlying.partitionLeaderEpoch();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index dcd7845..16d3777 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -224,7 +224,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
" size of this log segment is " + originalSize + " bytes.");
if (targetSize < (int) channel.size()) {
channel.truncate(targetSize);
- channel.position(targetSize);
size.set(targetSize);
}
return originalSize - targetSize;
@@ -276,11 +275,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
- public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
+ public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
- return new LogEntryPosition(offset, batch.position(), batch.sizeInBytes());
+ return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}
@@ -429,12 +428,12 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
}
- public static class LogEntryPosition {
+ public static class LogOffsetPosition {
public final long offset;
public final int position;
public final int size;
- public LogEntryPosition(long offset, int position, int size) {
+ public LogOffsetPosition(long offset, int position, int size) {
this.offset = offset;
this.position = position;
this.size = size;
@@ -447,7 +446,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
if (o == null || getClass() != o.getClass())
return false;
- LogEntryPosition that = (LogEntryPosition) o;
+ LogOffsetPosition that = (LogOffsetPosition) o;
return offset == that.offset &&
position == that.position &&
@@ -465,7 +464,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
@Override
public String toString() {
- return "LogEntryPosition(" +
+ return "LogOffsetPosition(" +
"offset=" + offset +
", position=" + position +
", size=" + size +
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 548cd45..c8754c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -292,6 +292,16 @@ public class MemoryRecords extends AbstractRecords {
return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset);
}
+ public static MemoryRecordsBuilder idempotentBuilder(ByteBuffer buffer,
+ CompressionType compressionType,
+ long baseOffset,
+ long producerId,
+ short producerEpoch,
+ int baseSequence) {
+ return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ baseOffset, System.currentTimeMillis(), producerId, producerEpoch, baseSequence);
+ }
+
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
@@ -307,7 +317,8 @@ public class MemoryRecords extends AbstractRecords {
long baseOffset,
long logAppendTime) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -320,7 +331,8 @@ public class MemoryRecords extends AbstractRecords {
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -335,6 +347,18 @@ public class MemoryRecords extends AbstractRecords {
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ CompressionType compressionType,
+ long baseOffset,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ boolean isTransactional) {
+ return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, baseOffset,
+ RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
@@ -359,18 +383,18 @@ public class MemoryRecords extends AbstractRecords {
boolean isTransactional,
int partitionLeaderEpoch) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
- logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch,
+ logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, partitionLeaderEpoch,
buffer.remaining());
}
-
public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, compressionType, records);
}
public static MemoryRecords withRecords(CompressionType compressionType, int partitionLeaderEpoch, SimpleRecord... records) {
- return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+ return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME,
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ partitionLeaderEpoch, false, records);
}
public static MemoryRecords withRecords(byte magic, CompressionType compressionType, SimpleRecord... records) {
@@ -378,30 +402,52 @@ public class MemoryRecords extends AbstractRecords {
}
public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, SimpleRecord... records) {
- return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
+ return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+ records);
}
public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... records) {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
}
- public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
- long producerId, short producerEpoch, int baseSequence,
- int partitionLeaderEpoch, SimpleRecord... records) {
+ public static MemoryRecords withIdempotentRecords(byte magic, long initialOffset, CompressionType compressionType,
+ long producerId, short producerEpoch, int baseSequence,
+ int partitionLeaderEpoch, SimpleRecord... records) {
return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
- baseSequence, partitionLeaderEpoch, records);
+ baseSequence, partitionLeaderEpoch, false, records);
+ }
+
+ public static MemoryRecords withIdempotentRecords(long initialOffset, CompressionType compressionType, long producerId,
+ short producerEpoch, int baseSequence, int partitionLeaderEpoch,
+ SimpleRecord... records) {
+ return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+ producerId, producerEpoch, baseSequence, partitionLeaderEpoch, false, records);
+ }
+
+ public static MemoryRecords withTransactionalRecords(CompressionType compressionType, long producerId,
+ short producerEpoch, int baseSequence, SimpleRecord... records) {
+ return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME,
+ producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
+ }
+
+ public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId,
+ short producerEpoch, int baseSequence, SimpleRecord... records) {
+ return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+ producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
}
public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
TimestampType timestampType, SimpleRecord... records) {
return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, records);
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ false, records);
}
- private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
- TimestampType timestampType, long producerId, short producerEpoch,
- int baseSequence, int partitionLeaderEpoch, SimpleRecord ... records) {
+ public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+ TimestampType timestampType, long producerId, short producerEpoch,
+ int baseSequence, int partitionLeaderEpoch, boolean isTransactional,
+ SimpleRecord ... records) {
if (records.length == 0)
return MemoryRecords.EMPTY;
int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
@@ -409,11 +455,38 @@ public class MemoryRecords extends AbstractRecords {
long logAppendTime = RecordBatch.NO_TIMESTAMP;
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
- logAppendTime, producerId, producerEpoch, baseSequence, false, partitionLeaderEpoch);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType,
+ initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false,
+ partitionLeaderEpoch, buffer.capacity());
for (SimpleRecord record : records)
builder.append(record);
return builder.build();
}
+ public static MemoryRecords withEndTransactionMarker(long producerId, short producerEpoch, EndTransactionMarker marker) {
+ return withEndTransactionMarker(0L, producerId, producerEpoch, marker);
+ }
+
+ public static MemoryRecords withEndTransactionMarker(long initialOffset, long producerId, short producerEpoch,
+ EndTransactionMarker marker) {
+ int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
+ EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
+ ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
+ writeEndTransactionalMarker(buffer, initialOffset, producerId, producerEpoch, marker);
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
+
+ public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long producerId,
+ short producerEpoch, EndTransactionMarker marker) {
+ boolean isTransactional = true;
+ boolean isControlBatch = true;
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+ TimestampType.CREATE_TIME, initialOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch,
+ RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ buffer.capacity());
+ builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
+ builder.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index b9d65a5..f7451cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -66,6 +66,7 @@ public class MemoryRecordsBuilder {
private final long baseOffset;
private final long logAppendTime;
private final boolean isTransactional;
+ private final boolean isControlBatch;
private final int partitionLeaderEpoch;
private final int writeLimit;
private final int initialCapacity;
@@ -112,17 +113,18 @@ public class MemoryRecordsBuilder {
short producerEpoch,
int baseSequence,
boolean isTransactional,
+ boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
-
- if (isTransactional) {
- if (magic < RecordBatch.MAGIC_VALUE_V2)
- throw new IllegalArgumentException("Transactional messages are not supported for magic " + magic);
+ if (magic < RecordBatch.MAGIC_VALUE_V2) {
+ if (isTransactional)
+ throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
+ if (isControlBatch)
+ throw new IllegalArgumentException("Control records are not supported for magic " + magic);
}
-
this.magic = magic;
this.timestampType = timestampType;
this.compressionType = compressionType;
@@ -137,6 +139,7 @@ public class MemoryRecordsBuilder {
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
+ this.isControlBatch = isControlBatch;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialCapacity = buffer.capacity();
@@ -254,7 +257,7 @@ public class MemoryRecordsBuilder {
if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
throw new IllegalArgumentException("Invalid negative producer epoch");
- if (baseSequence == RecordBatch.NO_SEQUENCE)
+ if (baseSequence < 0 && !isControlBatch)
throw new IllegalArgumentException("Invalid negative sequence number used");
if (magic < RecordBatch.MAGIC_VALUE_V2)
@@ -298,7 +301,7 @@ public class MemoryRecordsBuilder {
}
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
- baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional,
+ baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
partitionLeaderEpoch, numRecords);
buffer.position(pos);
@@ -326,26 +329,26 @@ public class MemoryRecordsBuilder {
}
private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
- ByteBuffer value, Header[] headers) {
+ ByteBuffer value, Header[] headers) {
try {
+ if (isControlRecord != isControlBatch)
+ throw new IllegalArgumentException("Control records can only be appended to control batches");
+
if (lastOffset != null && offset <= lastOffset)
- throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+ throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
+ "(Offsets must increase monotonically).", offset, lastOffset));
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
- if (magic < RecordBatch.MAGIC_VALUE_V2) {
- if (isControlRecord)
- throw new IllegalArgumentException("Magic v" + magic + " does not support control records");
- if (headers != null && headers.length > 0)
- throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
- }
+ if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
+ throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1)
- return appendDefaultRecord(offset, isControlRecord, timestamp, key, value, headers);
+ return appendDefaultRecord(offset, timestamp, key, value, headers);
else
return appendLegacyRecord(offset, timestamp, key, value);
} catch (IOException e) {
@@ -388,7 +391,7 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
- return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+ return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
}
/**
@@ -400,7 +403,7 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
- return appendWithOffset(offset, false, timestamp, key, value, Record.EMPTY_HEADERS);
+ return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
}
/**
@@ -410,7 +413,7 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long appendWithOffset(long offset, SimpleRecord record) {
- return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers());
+ return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
}
@@ -434,7 +437,7 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
- return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, headers);
+ return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}
/**
@@ -476,7 +479,7 @@ public class MemoryRecordsBuilder {
* @param value The control record value
* @return crc of the record
*/
- public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+ private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
@@ -484,6 +487,15 @@ public class MemoryRecordsBuilder {
return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
}
+ public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
+ if (producerId == RecordBatch.NO_PRODUCER_ID)
+ throw new IllegalArgumentException("End transaction marker requires a valid producerId");
+ if (!isTransactional)
+ throw new IllegalArgumentException("End transaction marker depends on batch transactional flag being enabled");
+ ByteBuffer value = marker.serializeValue();
+ return appendControlRecord(timestamp, marker.controlType(), value);
+ }
+
/**
* Add a legacy record without doing offset/magic validation (this should only be used in testing).
* @param offset The offset of the record
@@ -509,8 +521,7 @@ public class MemoryRecordsBuilder {
* @param record the record to add
*/
public void append(Record record) {
- appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value(),
- record.headers());
+ appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers());
}
/**
@@ -519,8 +530,7 @@ public class MemoryRecordsBuilder {
* @param record The record to add
*/
public void appendWithOffset(long offset, Record record) {
- appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value(),
- record.headers());
+ appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
}
/**
@@ -542,12 +552,12 @@ public class MemoryRecordsBuilder {
appendWithOffset(nextSequentialOffset(), record);
}
- private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp,
- ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {
+ private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
+ Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
- long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
+ long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers));
return crc;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index fdf41b3..cba6fc5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -133,15 +133,6 @@ public interface Record {
boolean hasTimestampType(TimestampType timestampType);
/**
- * Check whether this is a control record (i.e. whether the control bit is set in the record attributes).
- * For magic versions prior to 2, this is always false.
- *
- * @return Whether this is a control record
- */
- boolean isControlRecord();
-
-
- /**
* Get the headers. For magic versions 1 and below, this always returns an empty array.
*
* @return the array of headers
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 4fd03e1..c984deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -216,4 +216,12 @@ public interface RecordBatch extends Iterable<Record> {
* @return The closeable iterator
*/
CloseableIterator<Record> streamingIterator();
+
+ /**
+ * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes).
+ * For magic versions prior to 2, this is always false.
+ *
+ * @return Whether this is a batch containing control records
+ */
+ boolean isControlBatch();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 7dbffd1..03f6ee5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -40,6 +40,7 @@ public class ListOffsetRequest extends AbstractRequest {
public static final int DEBUGGING_REPLICA_ID = -2;
private static final String REPLICA_ID_KEY_NAME = "replica_id";
+ private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level";
private static final String TOPICS_KEY_NAME = "topics";
// topic level field names
@@ -52,6 +53,7 @@ public class ListOffsetRequest extends AbstractRequest {
private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
private final int replicaId;
+ private final IsolationLevel isolationLevel;
private final Map<TopicPartition, PartitionData> offsetData;
private final Map<TopicPartition, Long> partitionTimestamps;
private final Set<TopicPartition> duplicatePartitions;
@@ -59,23 +61,29 @@ public class ListOffsetRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
private final int replicaId;
private final short minVersion;
+ private final IsolationLevel isolationLevel;
private Map<TopicPartition, PartitionData> offsetData = null;
private Map<TopicPartition, Long> partitionTimestamps = null;
public static Builder forReplica(short desiredVersion, int replicaId) {
- return new Builder((short) 0, desiredVersion, replicaId);
+ return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
}
- public static Builder forConsumer(boolean requireTimestamp) {
+ public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
// If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
- short minVersion = requireTimestamp ? (short) 1 : (short) 0;
- return new Builder(minVersion, null, CONSUMER_REPLICA_ID);
+ short minVersion = 0;
+ if (isolationLevel == IsolationLevel.READ_COMMITTED)
+ minVersion = 2;
+ else if (requireTimestamp)
+ minVersion = 1;
+ return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel);
}
- private Builder(short minVersion, Short desiredVersion, int replicaId) {
+ private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) {
super(ApiKeys.LIST_OFFSETS, desiredVersion);
this.minVersion = minVersion;
this.replicaId = replicaId;
+ this.isolationLevel = isolationLevel;
}
public Builder setOffsetData(Map<TopicPartition, PartitionData> offsetData) {
@@ -118,7 +126,7 @@ public class ListOffsetRequest extends AbstractRequest {
}
}
Map<TopicPartition, ?> m = (version == 0) ? offsetData : partitionTimestamps;
- return new ListOffsetRequest(replicaId, m, version);
+ return new ListOffsetRequest(replicaId, m, isolationLevel, version);
}
@Override
@@ -165,9 +173,10 @@ public class ListOffsetRequest extends AbstractRequest {
* Private constructor with a specified version.
*/
@SuppressWarnings("unchecked")
- private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, short version) {
+ private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, IsolationLevel isolationLevel, short version) {
super(version);
this.replicaId = replicaId;
+ this.isolationLevel = isolationLevel;
this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null;
this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null;
this.duplicatePartitions = Collections.emptySet();
@@ -177,6 +186,9 @@ public class ListOffsetRequest extends AbstractRequest {
super(version);
Set<TopicPartition> duplicatePartitions = new HashSet<>();
replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+ isolationLevel = struct.hasField(ISOLATION_LEVEL_KEY_NAME) ?
+ IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME)) :
+ IsolationLevel.READ_UNCOMMITTED;
offsetData = new HashMap<>();
partitionTimestamps = new HashMap<>();
for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
@@ -223,7 +235,6 @@ public class ListOffsetRequest extends AbstractRequest {
switch (versionId) {
case 0:
case 1:
- return new ListOffsetResponse(responseData);
case 2:
return new ListOffsetResponse(throttleTimeMs, responseData);
default:
@@ -236,6 +247,10 @@ public class ListOffsetRequest extends AbstractRequest {
return replicaId;
}
+ public IsolationLevel isolationLevel() {
+ return isolationLevel;
+ }
+
@Deprecated
public Map<TopicPartition, PartitionData> offsetData() {
return offsetData;
@@ -262,6 +277,9 @@ public class ListOffsetRequest extends AbstractRequest {
Map<String, Map<Integer, Object>> topicsData = CollectionUtils.groupDataByTopic(targetTimes);
struct.set(REPLICA_ID_KEY_NAME, replicaId);
+
+ if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
+ struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id());
List<Struct> topicArray = new ArrayList<>();
for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index cb0ff89..24ee788 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -762,6 +762,11 @@ public class Utils {
} while (bytesRead != -1 && destinationBuffer.hasRemaining());
}
+ public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException {
+ while (sourceBuffer.hasRemaining())
+ channel.write(sourceBuffer);
+ }
+
/**
* Write the contents of a buffer to an output stream. The bytes are copied from the current position
* in the buffer.