You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/02 18:40:40 UTC
kafka git commit: KAFKA-3196;
Added checksum and size to RecordMetadata and ConsumerRecord
Repository: kafka
Updated Branches:
refs/heads/trunk 23f239b06 -> 002b377da
KAFKA-3196; Added checksum and size to RecordMetadata and ConsumerRecord
This is the second (remaining) part of KIP-42. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
Author: Anna Povzner <an...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
Closes #951 from apovzner/kafka-3196
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/002b377d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/002b377d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/002b377d
Branch: refs/heads/trunk
Commit: 002b377dad9c956cd0ae0597981f29698883b6d5
Parents: 23f239b
Author: Anna Povzner <an...@confluent.io>
Authored: Wed Mar 2 09:40:34 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 2 09:40:34 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/ConsumerInterceptor.java | 5 ++-
.../kafka/clients/consumer/ConsumerRecord.java | 37 ++++++++++++++++++-
.../clients/consumer/internals/Fetcher.java | 14 +++++---
.../kafka/clients/producer/MockProducer.java | 6 ++--
.../kafka/clients/producer/RecordMetadata.java | 38 ++++++++++++++++++--
.../internals/FutureRecordMetadata.java | 24 +++++++++++--
.../clients/producer/internals/RecordBatch.java | 12 +++++--
.../apache/kafka/common/record/Compressor.java | 16 ++++++---
.../kafka/common/record/MemoryRecords.java | 6 ++--
.../clients/consumer/MockConsumerTest.java | 4 +--
.../internals/ConsumerInterceptorsTest.java | 8 +++--
.../kafka/clients/producer/RecordSendTest.java | 9 +++--
.../internals/ProducerInterceptorsTest.java | 2 +-
.../kafka/test/MockConsumerInterceptor.java | 6 ++--
.../connect/runtime/WorkerSinkTaskTest.java | 2 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 4 +--
.../connect/runtime/WorkerSourceTaskTest.java | 3 +-
.../connect/storage/KafkaConfigStorageTest.java | 24 ++++++-------
.../storage/KafkaOffsetBackingStoreTest.java | 16 ++++-----
.../storage/KafkaStatusBackingStoreTest.java | 2 +-
.../kafka/connect/util/KafkaBasedLogTest.java | 16 ++++-----
.../scala/kafka/tools/ConsoleConsumer.scala | 4 +--
.../scala/kafka/tools/SimpleConsumerShell.scala | 4 ++-
.../kafka/api/BaseConsumerTest.scala | 3 ++
.../kafka/api/BaseProducerSendTest.scala | 7 ++++
.../processor/internals/RecordQueue.java | 7 ++--
.../processor/internals/PartitionGroupTest.java | 12 +++----
.../internals/ProcessorStateManagerTest.java | 6 ++--
.../processor/internals/RecordQueueTest.java | 18 +++++-----
.../processor/internals/StandbyTaskTest.java | 18 +++++-----
.../processor/internals/StreamTaskTest.java | 30 ++++++++--------
.../kafka/test/ProcessorTopologyTestDriver.java | 2 +-
32 files changed, 251 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 5c13a41..70ea444 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -38,7 +38,10 @@ public interface ConsumerInterceptor<K, V> extends Configurable {
/**
* This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}
* <p>
- * This method is allowed to modify consumer records, in which case the new records will be returned.
+ * This method is allowed to modify consumer records, in which case the new records will be
+ * returned. There is no limitation on number of records that could be returned from this
+ * method. I.e., the interceptor can filter the records or generate new records.
+ * <p>
* Any exception thrown by this method will be caught by the caller, logged, but not propagated to the client.
* <p>
* Since the consumer may run multiple interceptors, a particular interceptor's onConsume() callback will be called
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 42e0a90..4165534 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -24,6 +24,9 @@ public final class ConsumerRecord<K, V> {
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
+ private final long checksum;
+ private final int serializedKeySize;
+ private final int serializedValueSize;
private final K key;
private final V value;
@@ -43,6 +46,9 @@ public final class ConsumerRecord<K, V> {
long offset,
long timestamp,
TimestampType timestampType,
+ long checksum,
+ int serializedKeySize,
+ int serializedValueSize,
K key,
V value) {
if (topic == null)
@@ -52,6 +58,9 @@ public final class ConsumerRecord<K, V> {
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
+ this.checksum = checksum;
+ this.serializedKeySize = serializedKeySize;
+ this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
}
@@ -105,9 +114,35 @@ public final class ConsumerRecord<K, V> {
return timestampType;
}
+ /**
+ * The checksum (CRC32) of the record.
+ */
+ public long checksum() {
+ return this.checksum;
+ }
+
+ /**
+ * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
+ * is -1.
+ */
+ public int serializedKeySize() {
+ return this.serializedKeySize;
+ }
+
+ /**
+ * The size of the serialized, uncompressed value in bytes. If value is null, the
+ * returned size is -1.
+ */
+ public int serializedValueSize() {
+ return this.serializedValueSize;
+ }
+
@Override
public String toString() {
return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
- + ", " + timestampType + " = " + timestamp + ", key = " + key + ", value = " + value + ")";
+ + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
+ + ", serialized key size = " + serializedKeySize
+ + ", serialized value size = " + serializedValueSize
+ + ", key = " + key + ", value = " + value + ")";
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5d92a76..e2a5548 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -644,11 +644,17 @@ public class Fetcher<K, V> {
long timestamp = logEntry.record().timestamp();
TimestampType timestampType = logEntry.record().timestampType();
ByteBuffer keyBytes = logEntry.record().key();
- K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+ byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
ByteBuffer valueBytes = logEntry.record().value();
- V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
-
- return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, key, value);
+ byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
+
+ return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+ timestamp, timestampType, logEntry.record().checksum(),
+ keyByteArray == null ? -1 : keyByteArray.length,
+ valueByteArray == null ? -1 : valueByteArray.length,
+ key, value);
} catch (KafkaException e) {
throw e;
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 5f97bae..109b0ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -117,10 +117,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
if (this.cluster.partitionsForTopic(record.topic()) != null)
partition = partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
- FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP);
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
- Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP), result, callback);
+ Completion completion = new Completion(topicPartition, offset,
+ new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0),
+ result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index d9ea239..c60a53d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -30,19 +30,28 @@ public final class RecordMetadata {
// user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
// producer.
private final long timestamp;
+ private final long checksum;
+ private final int serializedKeySize;
+ private final int serializedValueSize;
private final TopicPartition topicPartition;
- private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp) {
+ private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long
+ checksum, int serializedKeySize, int serializedValueSize) {
super();
this.offset = offset;
this.timestamp = timestamp;
+ this.checksum = checksum;
+ this.serializedKeySize = serializedKeySize;
+ this.serializedValueSize = serializedValueSize;
this.topicPartition = topicPartition;
}
- public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp) {
+ public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,
+ long timestamp, long checksum, int serializedKeySize, int serializedValueSize) {
// ignore the relativeOffset if the base offset is -1,
// since this indicates the offset is unknown
- this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, timestamp);
+ this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset,
+ timestamp, checksum, serializedKeySize, serializedValueSize);
}
/**
@@ -60,6 +69,29 @@ public final class RecordMetadata {
}
/**
+ * The checksum (CRC32) of the record.
+ */
+ public long checksum() {
+ return this.checksum;
+ }
+
+ /**
+ * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
+ * is -1.
+ */
+ public int serializedKeySize() {
+ return this.serializedKeySize;
+ }
+
+ /**
+ * The size of the serialized, uncompressed value in bytes. If value is null, the returned
+ * size is -1.
+ */
+ public int serializedValueSize() {
+ return this.serializedValueSize;
+ }
+
+ /**
* The topic the record was appended to
*/
public String topic() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index a140371..d5995a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -27,11 +27,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final ProduceRequestResult result;
private final long relativeOffset;
private final long timestamp;
+ private final long checksum;
+ private final int serializedKeySize;
+ private final int serializedValueSize;
- public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp) {
+ public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp,
+ long checksum, int serializedKeySize, int serializedValueSize) {
this.result = result;
this.relativeOffset = relativeOffset;
this.timestamp = timestamp;
+ this.checksum = checksum;
+ this.serializedKeySize = serializedKeySize;
+ this.serializedValueSize = serializedValueSize;
}
@Override
@@ -61,7 +68,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
}
RecordMetadata value() {
- return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, this.timestamp);
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
+ this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);
}
public long relativeOffset() {
@@ -72,6 +80,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
return this.timestamp;
}
+ public long checksum() {
+ return this.checksum;
+ }
+
+ public int serializedKeySize() {
+ return this.serializedKeySize;
+ }
+
+ public int serializedValueSize() {
+ return this.serializedValueSize;
+ }
+
@Override
public boolean isCancelled() {
return false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index af9095d..7b5fbbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -67,10 +67,13 @@ public final class RecordBatch {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
- this.records.append(offsetCounter++, timestamp, key, value);
+ long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
- FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp);
+ FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
+ timestamp, checksum,
+ key == null ? -1 : key.length,
+ value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
@@ -97,7 +100,10 @@ public final class RecordBatch {
if (exception == null) {
// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(),
- timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp);
+ timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
+ thunk.future.checksum(),
+ thunk.future.serializedKeySize(),
+ thunk.future.serializedValueSize());
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index cde2178..afa85a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -201,16 +201,24 @@ public class Compressor {
}
}
- public void putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+ /**
+ * @return CRC of the record
+ */
+ public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
+ int valueOffset, int valueSize) {
// put a record as un-compressed into the underlying stream
long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
byte attributes = Record.computeAttributes(type);
putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
-
+ return crc;
}
- public void putRecord(long timestamp, byte[] key, byte[] value) {
- putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
+ /**
+ * Put a record as uncompressed into the underlying stream
+ * @return CRC of the record
+ */
+ public long putRecord(long timestamp, byte[] key, byte[] value) {
+ return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
}
private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 01da1e2..f37ef39 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -88,16 +88,18 @@ public class MemoryRecords implements Records {
/**
* Append a new record and offset to the buffer
+ * @return crc of the record
*/
- public void append(long offset, long timestamp, byte[] key, byte[] value) {
+ public long append(long offset, long timestamp, byte[] key, byte[] value) {
if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = Record.recordSize(key, value);
compressor.putLong(offset);
compressor.putInt(size);
- compressor.putRecord(timestamp, key, value);
+ long crc = compressor.putRecord(timestamp, key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
+ return crc;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 3ef5c8b..70b1c09 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -43,8 +43,8 @@ public class MockConsumerTest {
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
- ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, "key1", "value1");
- ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, "key2", "value2");
+ ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
+ ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords<String, String> recs = consumer.poll(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 25843c7..4259c75 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -44,7 +44,7 @@ public class ConsumerInterceptorsTest {
private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
private final ConsumerRecord<Integer, Integer> consumerRecord =
- new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 1, 1);
+ new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1);
private int onCommitCount = 0;
private int onConsumeCount = 0;
@@ -117,9 +117,11 @@ public class ConsumerInterceptorsTest {
List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
list1.add(consumerRecord);
List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
- list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
+ list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0,
+ 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
- list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
+ list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0,
+ 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
records.put(tp, list1);
records.put(filterTopicPart1, list2);
records.put(filterTopicPart2, list3);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 5591129..d820dab 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -45,7 +45,8 @@ public class RecordSendTest {
@Test
public void testTimeout() throws Exception {
ProduceRequestResult request = new ProduceRequestResult();
- FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, Record.NO_TIMESTAMP);
+ FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
+ Record.NO_TIMESTAMP, 0, 0, 0);
assertFalse("Request is not completed", future.isDone());
try {
future.get(5, TimeUnit.MILLISECONDS);
@@ -63,7 +64,8 @@ public class RecordSendTest {
*/
@Test(expected = ExecutionException.class)
public void testError() throws Exception {
- FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, Record.NO_TIMESTAMP);
+ FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
+ relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
future.get();
}
@@ -72,7 +74,8 @@ public class RecordSendTest {
*/
@Test
public void testBlocking() throws Exception {
- FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, Record.NO_TIMESTAMP);
+ FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
+ relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
assertEquals(baseOffset + relOffset, future.get().offset());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 26d15d0..5a32dda 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -128,7 +128,7 @@ public class ProducerInterceptorsTest {
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
// verify onAck is called on all interceptors
- RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0);
+ RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0);
interceptors.onAcknowledgement(meta, null);
assertEquals(2, onAckCount);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 3246578..0c187cb 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.record.TimestampType;
import java.util.ArrayList;
import java.util.HashMap;
@@ -57,7 +56,10 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
List<ConsumerRecord<String, String>> lst = new ArrayList<>();
for (ConsumerRecord<String, String> record: records.records(tp)) {
lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
- 0L, TimestampType.CREATE_TIME, record.key(), record.value().toUpperCase()));
+ record.timestamp(), record.timestampType(),
+ record.checksum(), record.serializedKeySize(),
+ record.serializedValueSize(),
+ record.key(), record.value().toUpperCase()));
}
recordMap.put(tp, lst);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 6721609..aef3344 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -293,7 +293,7 @@ public class WorkerSinkTaskTest {
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
- records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE));
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE));
recordsReturned += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 77f1ed0..b37b34f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -520,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
@@ -548,7 +548,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8e9eb72..8fb8bb5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -408,7 +408,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
public Future<RecordMetadata> answer() throws Throwable {
synchronized (producerCallbacks) {
for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
- cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L), null);
+ cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
+ 0L, 0L, 0, 0), null);
}
producerCallbacks.reset();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index e878e12..cfc713f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -289,14 +289,14 @@ public class KafkaConfigStorageTest {
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
// Connector after root update should make it through, task update shouldn't
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
- new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -343,12 +343,12 @@ public class KafkaConfigStorageTest {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
// This is the record that has been compacted:
//new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -484,7 +484,7 @@ public class KafkaConfigStorageTest {
public Future<Void> answer() throws Throwable {
TestFuture<Void> future = new TestFuture<Void>();
for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, entry.getKey(), entry.getValue()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
future.resolveOnGet((Void) null);
return future;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 61763a8..22bb376 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -126,10 +126,10 @@ public class KafkaOffsetBackingStoreTest {
public void testReloadOnStart() throws Exception {
expectConfigure();
expectStart(Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()),
- new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()),
- new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array())
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
+ new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+ new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
));
expectStop();
@@ -177,8 +177,8 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()));
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
}
@@ -190,8 +190,8 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()));
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
thirdGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index cdbab64..8acd31f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -367,7 +367,7 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
- TimestampType.CREATE_TIME, key, value);
+ TimestampType.CREATE_TIME, 0L, 0, 0, key, value);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index b2246f5..ec58cb6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -183,7 +183,7 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
}
});
consumer.scheduleNopPollTask();
@@ -191,7 +191,7 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
}
});
consumer.schedulePollTask(new Runnable() {
@@ -298,16 +298,16 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
}
});
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE_NEW));
}
});
@@ -363,8 +363,8 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 855025e..50add72 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -127,8 +127,8 @@ object ConsoleConsumer extends Logging {
}
messageCount += 1
try {
- formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
- msg.key, msg.value), System.out)
+ formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
+ msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4b68e0..6ad68b6 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -223,8 +223,10 @@ object SimpleConsumerShell extends Logging {
val message = messageAndOffset.message
val key = if (message.hasKey) Utils.readBytes(message.key) else null
val value = if (message.isNull) null else Utils.readBytes(message.payload)
+ val serializedKeySize = if (message.hasKey) key.size else -1
+ val serializedValueSize = if (message.isNull) -1 else value.size
formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp,
- message.timestampType, key, value), System.out)
+ message.timestampType, message.checksum, serializedKeySize, serializedValueSize, key, value), System.out)
numMessagesConsumed += 1
} catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 19a8882..684b38f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -309,6 +309,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
val keyAndValueIndex = startingKeyAndValueIndex + i
assertEquals(s"key $keyAndValueIndex", new String(record.key))
assertEquals(s"value $keyAndValueIndex", new String(record.value))
+ // this is true only because K and V are byte arrays
+ assertEquals(s"key $keyAndValueIndex".length, record.serializedKeySize)
+ assertEquals(s"value $keyAndValueIndex".length, record.serializedValueSize)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 807b8bb..2d89bf8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -96,6 +96,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals(offset, metadata.offset())
assertEquals(topic, metadata.topic())
assertEquals(partition, metadata.partition())
+ offset match {
+ case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize, "key".getBytes.length + "value".getBytes.length)
+ case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes.length)
+ case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes.length)
+ case _ => assertTrue(metadata.serializedValueSize > 0)
+ }
+ assertNotEquals(metadata.checksum(), 0)
offset += 1
} else {
fail("Send callback returns the following exception", exception)
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 62bf307..6911a45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -77,8 +77,11 @@ public class RecordQueue {
Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
- ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(),
- rawRecord.offset(), rawRecord.timestamp(), TimestampType.CREATE_TIME, key, value);
+ ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
+ rawRecord.timestamp(), TimestampType.CREATE_TIME,
+ rawRecord.checksum(),
+ rawRecord.serializedKeySize(),
+ rawRecord.serializedValueSize(), key, value);
long timestamp = timestampExtractor.extract(record);
StampedRecord stampedRecord = new StampedRecord(record, timestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 61f6dbf..5bf1b5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -60,17 +60,17 @@ public class PartitionGroupTest {
// add three 3 records with timestamp 1, 3, 5 to partition-1
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
group.addRawRecords(partition1, list1);
// add three 3 records with timestamp 2, 4, 6 to partition-2
List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
group.addRawRecords(partition2, list2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 916079d..14cb493 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -85,7 +85,7 @@ public class ProcessorStateManagerTest {
public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
recordBuffer.add(
new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L,
- TimestampType.CREATE_TIME,
+ TimestampType.CREATE_TIME, 0L, 0, 0,
serializer.serialize(record.topic(), record.key()),
serializer.serialize(record.topic(), record.value())));
endOffset = record.offset();
@@ -269,7 +269,7 @@ public class ProcessorStateManagerTest {
int key = i * 10;
expectedKeys.add(key);
restoreConsumer.bufferRecord(
- new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
+ new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
);
}
@@ -322,7 +322,7 @@ public class ProcessorStateManagerTest {
int key = i;
expectedKeys.add(i);
restoreConsumer.bufferRecord(
- new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
+ new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 36f38e6..8d9c91c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -51,9 +51,9 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 2, 1, 3
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
queue.addRawRecords(list1, timestampExtractor);
@@ -73,9 +73,9 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
queue.addRawRecords(list2, timestampExtractor);
@@ -100,9 +100,9 @@ public class RecordQueueTest {
// add three more records with 4, 5, 6
List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
queue.addRawRecords(list3, timestampExtractor);
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e0be587..295f0dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -153,7 +153,7 @@ public class StandbyTaskTest {
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
task.update(partition1,
- records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue))
+ records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
);
} finally {
@@ -172,9 +172,9 @@ public class StandbyTaskTest {
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100))) {
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
restoreStateConsumer.bufferRecord(record);
}
@@ -235,11 +235,11 @@ public class StandbyTaskTest {
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
- new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
- new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
- new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100),
- new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 4, 100),
- new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 5, 100))) {
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
+ new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
restoreStateConsumer.bufferRecord(record);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0430566..1f401db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -108,15 +108,15 @@ public class StreamTaskTest {
StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertEquals(5, task.process());
@@ -159,15 +159,15 @@ public class StreamTaskTest {
StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertEquals(5, task.process());
@@ -178,9 +178,9 @@ public class StreamTaskTest {
assertTrue(consumer.paused().contains(partition2));
task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertEquals(2, consumer.paused().size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index e414d80..34fd10c 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -201,7 +201,7 @@ public class ProcessorTopologyTestDriver {
}
// Add the record ...
long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
- task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, key, value)));
+ task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
producer.clear();
// Process the record ...
task.process();