You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:36 UTC
[9/9] kafka git commit: KAFKA-4390;
Replace MessageSet usage with client-side alternatives
KAFKA-4390; Replace MessageSet usage with client-side alternatives
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #2140 from hachikuji/KAFKA4390
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67f1e5b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67f1e5b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67f1e5b9
Branch: refs/heads/trunk
Commit: 67f1e5b91bf073151ff57d5d656693e385726697
Parents: 6626b05
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Dec 13 10:26:25 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 13 10:26:25 2016 -0800
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
.../clients/consumer/internals/Fetcher.java | 7 +-
.../clients/producer/internals/BufferPool.java | 2 +-
.../producer/internals/RecordAccumulator.java | 30 +-
.../clients/producer/internals/RecordBatch.java | 58 +-
.../clients/producer/internals/Sender.java | 10 +-
.../kafka/common/record/AbstractRecords.java | 92 +++
.../common/record/ByteBufferInputStream.java | 37 +-
.../common/record/ByteBufferLogInputStream.java | 119 ++++
.../common/record/ByteBufferOutputStream.java | 54 +-
.../kafka/common/record/CompressionType.java | 2 +-
.../apache/kafka/common/record/Compressor.java | 332 ----------
.../kafka/common/record/FileLogInputStream.java | 166 +++++
.../apache/kafka/common/record/FileRecords.java | 465 ++++++++++++--
.../common/record/InvalidRecordException.java | 4 +-
.../apache/kafka/common/record/LogEntry.java | 151 ++++-
.../kafka/common/record/LogInputStream.java | 12 +-
.../kafka/common/record/MemoryRecords.java | 443 +++++++++-----
.../common/record/MemoryRecordsBuilder.java | 461 ++++++++++++++
.../org/apache/kafka/common/record/Record.java | 538 +++++++++++-----
.../org/apache/kafka/common/record/Records.java | 64 +-
.../kafka/common/record/RecordsIterator.java | 179 ++++--
.../kafka/common/record/TimestampType.java | 1 +
.../org/apache/kafka/common/utils/Utils.java | 51 ++
.../clients/consumer/KafkaConsumerTest.java | 7 +-
.../clients/consumer/internals/FetcherTest.java | 100 +--
.../internals/RecordAccumulatorTest.java | 51 +-
.../record/ByteBufferLogInputStreamTest.java | 110 ++++
.../kafka/common/record/FileRecordsTest.java | 410 +++++++++++++
.../common/record/MemoryRecordsBuilderTest.java | 253 ++++++++
.../kafka/common/record/MemoryRecordsTest.java | 194 +++++-
.../apache/kafka/common/record/RecordTest.java | 46 +-
.../kafka/common/record/SimpleRecordTest.java | 52 +-
.../kafka/common/record/TimestampTypeTest.java | 37 ++
.../java/org/apache/kafka/test/TestUtils.java | 32 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 20 +-
.../main/scala/kafka/cluster/Partition.scala | 10 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 10 +-
.../kafka/coordinator/GroupCoordinator.scala | 6 +-
.../coordinator/GroupMetadataManager.scala | 170 +++--
.../main/scala/kafka/log/FileMessageSet.scala | 445 --------------
core/src/main/scala/kafka/log/Log.scala | 128 ++--
core/src/main/scala/kafka/log/LogCleaner.scala | 55 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
core/src/main/scala/kafka/log/LogSegment.scala | 98 +--
.../src/main/scala/kafka/log/LogValidator.scala | 239 ++++++++
core/src/main/scala/kafka/log/TimeIndex.scala | 6 +-
.../kafka/message/ByteBufferMessageSet.scala | 613 +------------------
core/src/main/scala/kafka/message/Message.scala | 64 +-
.../scala/kafka/message/MessageAndOffset.scala | 14 +-
.../main/scala/kafka/message/MessageSet.scala | 8 +-
.../kafka/server/AbstractFetcherThread.scala | 10 +-
.../main/scala/kafka/server/DelayedFetch.scala | 5 +-
.../main/scala/kafka/server/FetchDataInfo.scala | 7 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 32 +-
.../kafka/server/ReplicaFetcherThread.scala | 22 +-
.../scala/kafka/server/ReplicaManager.scala | 85 +--
.../scala/kafka/tools/DumpLogSegments.scala | 150 ++---
.../kafka/tools/ReplicaVerificationTool.scala | 52 +-
.../api/GroupCoordinatorIntegrationTest.scala | 6 +-
.../scala/kafka/tools/TestLogCleaning.scala | 19 +-
.../test/scala/other/kafka/StressTestLog.scala | 22 +-
.../other/kafka/TestLinearWriteSpeed.scala | 19 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 2 +-
.../GroupCoordinatorResponseTest.scala | 30 +-
.../coordinator/GroupMetadataManagerTest.scala | 100 ++-
.../unit/kafka/log/BrokerCompressionTest.scala | 15 +-
.../unit/kafka/log/FileMessageSetTest.scala | 354 -----------
.../kafka/log/LogCleanerIntegrationTest.scala | 66 +-
.../log/LogCleanerLagIntegrationTest.scala | 24 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 29 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 275 ++++-----
.../scala/unit/kafka/log/LogManagerTest.scala | 22 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 136 ++--
.../src/test/scala/unit/kafka/log/LogTest.scala | 204 +++---
.../scala/unit/kafka/log/LogValidatorTest.scala | 395 ++++++++++++
.../kafka/message/BaseMessageSetTestCases.scala | 20 +-
.../message/ByteBufferMessageSetTest.scala | 348 -----------
.../kafka/message/MessageCompressionTest.scala | 2 +-
.../scala/unit/kafka/message/MessageTest.scala | 38 +-
.../server/AbstractFetcherThreadTest.scala | 32 +-
.../unit/kafka/server/FetchRequestTest.scala | 53 +-
.../unit/kafka/server/ISRExpirationTest.scala | 27 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 14 +-
.../unit/kafka/server/ProduceRequestTest.scala | 8 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 33 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 28 +-
.../unit/kafka/server/SimpleFetchTest.scala | 21 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 19 +-
89 files changed, 5269 insertions(+), 3914 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8eebdb5..62cd77a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -89,6 +89,7 @@
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.network" />
+ <allow pkg="org.apache.kafka.common.errors" />
</subpackage>
<subpackage name="requests">
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6fb4229..3b9d49c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,6 +61,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -686,11 +687,13 @@ public class Fetcher<K, V> {
}
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
- for (LogEntry logEntry : partition.records) {
+ Iterator<LogEntry> deepIterator = partition.records.deepIterator();
+ while (deepIterator.hasNext()) {
+ LogEntry logEntry = deepIterator.next();
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
- bytes += logEntry.size();
+ bytes += logEntry.sizeInBytes();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index b42b0ec..077215c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -198,7 +198,7 @@ public final class BufferPool {
* memory as free.
*
* @param buffer The buffer to return
- * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
+ * @param size The size of the buffer to mark as deallocated, note that this may be smaller than buffer.capacity
* since the buffer may re-allocate itself during in-place compression
*/
public void deallocate(ByteBuffer buffer, int size) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fa1e513..06d39ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -27,8 +27,10 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -49,7 +51,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
+ * This class acts as a queue that accumulates records into {@link MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
@@ -77,7 +79,7 @@ public final class RecordAccumulator {
/**
* Create a new record accumulator
*
- * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
+ * @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param totalSize The maximum memory the record accumulator can use.
* @param compression The compression codec for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
@@ -190,13 +192,13 @@ public final class RecordAccumulator {
free.deallocate(buffer);
return appendResult;
}
- MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
- RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+ MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
+ RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
- return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+ return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
@@ -212,9 +214,9 @@ public final class RecordAccumulator {
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
- last.records.close();
+ last.close();
else
- return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
+ return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
@@ -240,7 +242,7 @@ public final class RecordAccumulator {
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
- boolean isFull = batch != lastBatch || batch.records.isFull();
+ boolean isFull = batch != lastBatch || batch.isFull();
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
expiredBatches.add(batch);
@@ -319,7 +321,7 @@ public final class RecordAccumulator {
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
- boolean full = deque.size() > 1 || batch.records.isFull();
+ boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
@@ -389,15 +391,15 @@ public final class RecordAccumulator {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
- if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+ if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {
RecordBatch batch = deque.pollFirst();
- batch.records.close();
- size += batch.records.sizeInBytes();
+ batch.close();
+ size += batch.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
@@ -437,7 +439,7 @@ public final class RecordAccumulator {
*/
public void deallocate(RecordBatch batch) {
incomplete.remove(batch);
- free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
+ free.deallocate(batch.buffer(), batch.initialCapacity());
}
/**
@@ -507,7 +509,7 @@ public final class RecordAccumulator {
Deque<RecordBatch> dq = getDeque(batch.topicPartition);
// Close the batch before aborting
synchronized (dq) {
- batch.records.close();
+ batch.close();
dq.remove(batch);
}
batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 6706bfd..e9ef441 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -12,18 +12,20 @@
*/
package org.apache.kafka.clients.producer.internals;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* A batch of records that is or will be sent.
*
@@ -39,21 +41,21 @@ public final class RecordBatch {
public final long createdMs;
public long drainedMs;
public long lastAttemptMs;
- public final MemoryRecords records;
public final TopicPartition topicPartition;
public final ProduceRequestResult produceFuture;
public long lastAppendTime;
private final List<Thunk> thunks;
private long offsetCounter = 0L;
private boolean retry;
+ private final MemoryRecordsBuilder recordsBuilder;
- public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
+ public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
this.createdMs = now;
this.lastAttemptMs = now;
- this.records = records;
+ this.recordsBuilder = recordsBuilder;
this.topicPartition = tp;
this.produceFuture = new ProduceRequestResult();
- this.thunks = new ArrayList<Thunk>();
+ this.thunks = new ArrayList<>();
this.lastAppendTime = createdMs;
this.retry = false;
}
@@ -64,10 +66,10 @@ public final class RecordBatch {
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
- if (!this.records.hasRoomFor(key, value)) {
+ if (!recordsBuilder.hasRoomFor(key, value)) {
return null;
} else {
- long checksum = this.records.append(offsetCounter++, timestamp, key, value);
+ long checksum = this.recordsBuilder.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
@@ -94,9 +96,8 @@ public final class RecordBatch {
baseOffset,
exception);
// execute callbacks
- for (int i = 0; i < this.thunks.size(); i++) {
+ for (Thunk thunk : thunks) {
try {
- Thunk thunk = this.thunks.get(i);
if (exception == null) {
// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(),
@@ -156,7 +157,7 @@ public final class RecordBatch {
}
if (expire) {
- this.records.close();
+ close();
this.done(-1L, Record.NO_TIMESTAMP,
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage));
}
@@ -177,4 +178,37 @@ public final class RecordBatch {
public void setRetry() {
this.retry = true;
}
+
+ public MemoryRecords records() {
+ return recordsBuilder.build();
+ }
+
+ public int sizeInBytes() {
+ return recordsBuilder.sizeInBytes();
+ }
+
+ public double compressionRate() {
+ return recordsBuilder.compressionRate();
+ }
+
+ public boolean isFull() {
+ return recordsBuilder.isFull();
+ }
+
+ public void close() {
+ recordsBuilder.close();
+ }
+
+ public ByteBuffer buffer() {
+ return recordsBuilder.buffer();
+ }
+
+ public int initialCapacity() {
+ return recordsBuilder.initialCapacity();
+ }
+
+ public boolean isWritable() {
+ return !recordsBuilder.isClosed();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7555b71..1f54c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -345,7 +345,7 @@ public class Sender implements Runnable {
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- produceRecordsByPartition.put(tp, batch.records);
+ produceRecordsByPartition.put(tp, batch.records());
recordsByPartition.put(tp, batch);
}
@@ -505,17 +505,17 @@ public class Sender implements Runnable {
// per-topic bytes send rate
String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
- topicByteRate.record(batch.records.sizeInBytes());
+ topicByteRate.record(batch.sizeInBytes());
// per-topic compression rate
String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
- topicCompressionRate.record(batch.records.compressionRate());
+ topicCompressionRate.record(batch.compressionRate());
// global metrics
- this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
+ this.batchSizeSensor.record(batch.sizeInBytes(), now);
this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
- this.compressionRateSensor.record(batch.records.compressionRate());
+ this.compressionRateSensor.record(batch.compressionRate());
this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
records += batch.recordCount;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
new file mode 100644
index 0000000..3794dc6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractRecords implements Records {
+
+ @Override
+ public boolean hasMatchingShallowMagic(byte magic) {
+ Iterator<? extends LogEntry> iterator = shallowIterator();
+ while (iterator.hasNext())
+ if (iterator.next().magic() != magic)
+ return false;
+ return true;
+ }
+
+ /**
+ * Convert this message set to use the specified message format.
+ */
+ @Override
+ public Records toMessageFormat(byte toMagic) {
+ List<LogEntry> converted = new ArrayList<>();
+ Iterator<LogEntry> deepIterator = deepIterator();
+ while (deepIterator.hasNext()) {
+ LogEntry entry = deepIterator.next();
+ converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
+ }
+
+ if (converted.isEmpty()) {
+ // This indicates that the message is too large, which indicates that the buffer is not large
+ // enough to hold a full log entry. We just return all the bytes in the file message set.
+ // Even though the message set does not have the right format version, we expect old clients
+ // to raise an error to the user after reading the message size and seeing that there
+ // are not enough available bytes in the response to read the full message.
+ return this;
+ } else {
+ // We use the first message to determine the compression type for the resulting message set.
+ // This could result in message sets which are either larger or smaller than the original size.
+ // For example, it could end up larger if most messages were previously compressed, but
+ // it just so happens that the first one is not. There is also some risk that this can
+ // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
+ // we are essentially merging multiple message sets. However, currently this method is only
+ // used for down-conversion, so we've ignored the problem.
+ CompressionType compressionType = shallowIterator().next().record().compressionType();
+ return MemoryRecords.withLogEntries(compressionType, converted);
+ }
+ }
+
+ public static int estimatedSize(CompressionType compressionType, Iterable<LogEntry> entries) {
+ int size = 0;
+ for (LogEntry entry : entries)
+ size += entry.sizeInBytes();
+ // NOTE: 1024 is the minimum block size for snappy encoding
+ return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+ }
+
+ /**
+ * Get an iterator over the deep records.
+ * @return An iterator over the records
+ */
+ public Iterator<Record> records() {
+ return new AbstractIterator<Record>() {
+ private final Iterator<? extends LogEntry> deepEntries = deepIterator();
+ @Override
+ protected Record makeNext() {
+ if (deepEntries.hasNext())
+ return deepEntries.next().record();
+ return allDone();
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
index 84668a5..b25f949 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -16,34 +16,41 @@
*/
package org.apache.kafka.common.record;
+import java.io.DataInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* A byte buffer backed input inputStream
*/
-public class ByteBufferInputStream extends InputStream {
-
- private ByteBuffer buffer;
+public class ByteBufferInputStream extends DataInputStream {
public ByteBufferInputStream(ByteBuffer buffer) {
- this.buffer = buffer;
+ super(new UnderlyingInputStream(buffer));
}
- public int read() {
- if (!buffer.hasRemaining()) {
- return -1;
+ private static class UnderlyingInputStream extends InputStream {
+ private ByteBuffer buffer;
+
+ public UnderlyingInputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
}
- return buffer.get() & 0xFF;
- }
- public int read(byte[] bytes, int off, int len) {
- if (!buffer.hasRemaining()) {
- return -1;
+ public int read() {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
}
- len = Math.min(len, buffer.remaining());
- buffer.get(bytes, off, len);
- return len;
+ public int read(byte[] bytes, int off, int len) {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buffer.remaining());
+ buffer.get(bytes, off, len);
+ return len;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
new file mode 100644
index 0000000..ae0c91b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+
+/**
+ * A byte buffer backed log input stream. This class avoids the need to copy records by returning
+ * slices from the underlying byte buffer.
+ */
+class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStream.ByteBufferLogEntry> {
+ private final ByteBuffer buffer;
+ private final int maxMessageSize;
+
+ ByteBufferLogInputStream(ByteBuffer buffer, int maxMessageSize) {
+ this.buffer = buffer;
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public ByteBufferLogEntry nextEntry() throws IOException {
+ int remaining = buffer.remaining();
+ if (remaining < LOG_OVERHEAD)
+ return null;
+
+ int recordSize = buffer.getInt(buffer.position() + Records.SIZE_OFFSET);
+ if (recordSize < Record.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+ if (recordSize > maxMessageSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+ int entrySize = recordSize + LOG_OVERHEAD;
+ if (remaining < entrySize)
+ return null;
+
+ ByteBuffer entrySlice = buffer.slice();
+ entrySlice.limit(entrySize);
+ buffer.position(buffer.position() + entrySize);
+ return new ByteBufferLogEntry(entrySlice);
+ }
+
+ public static class ByteBufferLogEntry extends LogEntry {
+ private final ByteBuffer buffer;
+ private final Record record;
+
+ private ByteBufferLogEntry(ByteBuffer buffer) {
+ this.buffer = buffer;
+ buffer.position(LOG_OVERHEAD);
+ this.record = new Record(buffer.slice());
+ buffer.position(OFFSET_OFFSET);
+ }
+
+ @Override
+ public long offset() {
+ return buffer.getLong(OFFSET_OFFSET);
+ }
+
+ @Override
+ public Record record() {
+ return record;
+ }
+
+ public void setOffset(long offset) {
+ buffer.putLong(OFFSET_OFFSET, offset);
+ }
+
+ public void setCreateTime(long timestamp) {
+ if (record.magic() == Record.MAGIC_VALUE_V0)
+ throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
+
+ long currentTimestamp = record.timestamp();
+ // We don't need to recompute crc if the timestamp is not updated.
+ if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp)
+ return;
+
+ byte attributes = record.attributes();
+ buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.CREATE_TIME.updateAttributes(attributes));
+ buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
+ long crc = record.computeChecksum();
+ Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+ }
+
+ public void setLogAppendTime(long timestamp) {
+ if (record.magic() == Record.MAGIC_VALUE_V0)
+ throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
+
+ byte attributes = record.attributes();
+ buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.LOG_APPEND_TIME.updateAttributes(attributes));
+ buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
+ long crc = record.computeChecksum();
+ Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+ }
+
+ public ByteBuffer buffer() {
+ return buffer;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index 1c9fbaa..3fb7f49 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -16,42 +16,54 @@
*/
package org.apache.kafka.common.record;
+import java.io.DataOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A byte buffer backed output outputStream
*/
-public class ByteBufferOutputStream extends OutputStream {
+public class ByteBufferOutputStream extends DataOutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
- private ByteBuffer buffer;
-
public ByteBufferOutputStream(ByteBuffer buffer) {
- this.buffer = buffer;
+ super(new UnderlyingOutputStream(buffer));
}
- public void write(int b) {
- if (buffer.remaining() < 1)
- expandBuffer(buffer.capacity() + 1);
- buffer.put((byte) b);
+ public ByteBuffer buffer() {
+ return ((UnderlyingOutputStream) out).buffer;
}
- public void write(byte[] bytes, int off, int len) {
- if (buffer.remaining() < len)
- expandBuffer(buffer.capacity() + len);
- buffer.put(bytes, off, len);
- }
+ public static class UnderlyingOutputStream extends OutputStream {
+ private ByteBuffer buffer;
- public ByteBuffer buffer() {
- return buffer;
- }
+ public UnderlyingOutputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
- private void expandBuffer(int size) {
- int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
- ByteBuffer temp = ByteBuffer.allocate(expandSize);
- temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
- buffer = temp;
+ public void write(int b) {
+ if (buffer.remaining() < 1)
+ expandBuffer(buffer.capacity() + 1);
+ buffer.put((byte) b);
+ }
+
+ public void write(byte[] bytes, int off, int len) {
+ if (buffer.remaining() < len)
+ expandBuffer(buffer.capacity() + len);
+ buffer.put(bytes, off, len);
+ }
+
+ public ByteBuffer buffer() {
+ return buffer;
+ }
+
+ private void expandBuffer(int size) {
+ int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+ ByteBuffer temp = ByteBuffer.allocate(expandSize);
+ temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ buffer = temp;
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 65a7e43..e1d4754 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -26,7 +26,7 @@ public enum CompressionType {
public final String name;
public final float rate;
- private CompressionType(int id, String name, float rate) {
+ CompressionType(int id, String name, float rate) {
this.id = id;
this.name = name;
this.rate = rate;
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
deleted file mode 100644
index a806975..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.lang.reflect.Constructor;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-public class Compressor {
-
- static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
- static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
- static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
-
- private static final float[] TYPE_TO_RATE;
-
- static {
- int maxTypeId = -1;
- for (CompressionType type : CompressionType.values())
- maxTypeId = Math.max(maxTypeId, type.id);
- TYPE_TO_RATE = new float[maxTypeId + 1];
- for (CompressionType type : CompressionType.values()) {
- TYPE_TO_RATE[type.id] = type.rate;
- }
- }
-
- // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
- // caching constructors to avoid invoking of Class.forName method for each batch
- private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.xerial.snappy.SnappyOutputStream")
- .getConstructor(OutputStream.class, Integer.TYPE);
- }
- });
-
- private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
- .getConstructor(OutputStream.class);
- }
- });
-
- private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.xerial.snappy.SnappyInputStream")
- .getConstructor(InputStream.class);
- }
- });
-
- private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
- @Override
- public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
- return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
- .getConstructor(InputStream.class, Boolean.TYPE);
- }
- });
-
- private final CompressionType type;
- private final DataOutputStream appendStream;
- private final ByteBufferOutputStream bufferStream;
- private final int initPos;
-
- public long writtenUncompressed;
- public long numRecords;
- public float compressionRate;
- public long maxTimestamp;
-
- public Compressor(ByteBuffer buffer, CompressionType type) {
- this.type = type;
- this.initPos = buffer.position();
-
- this.numRecords = 0;
- this.writtenUncompressed = 0;
- this.compressionRate = 1;
- this.maxTimestamp = Record.NO_TIMESTAMP;
-
- if (type != CompressionType.NONE) {
- // for compressed records, leave space for the header and the shallow message metadata
- // and move the starting position to the value payload offset
- buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
- }
-
- // create the stream
- bufferStream = new ByteBufferOutputStream(buffer);
- appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
- }
-
- public ByteBuffer buffer() {
- return bufferStream.buffer();
- }
-
- public double compressionRate() {
- return compressionRate;
- }
-
- public void close() {
- try {
- appendStream.close();
- } catch (IOException e) {
- throw new KafkaException(e);
- }
-
- if (type != CompressionType.NONE) {
- ByteBuffer buffer = bufferStream.buffer();
- int pos = buffer.position();
- // write the header, for the end offset write as number of records - 1
- buffer.position(initPos);
- buffer.putLong(numRecords - 1);
- buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
- // write the shallow message (the crc and value size are not correct yet)
- Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
- // compute the fill the value size
- int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
- buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
- // compute and fill the crc at the beginning of the message
- long crc = Record.computeChecksum(buffer,
- initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
- pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
- Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
- // reset the position
- buffer.position(pos);
-
- // update the compression ratio
- this.compressionRate = (float) buffer.position() / this.writtenUncompressed;
- TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
- compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
- }
- }
-
- // Note that for all the write operations below, IO exceptions should
- // never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
- // therefore upon encountering this issue we just close the append stream.
-
- public void putLong(final long value) {
- try {
- appendStream.writeLong(value);
- } catch (IOException e) {
- throw new KafkaException("I/O exception when writing to the append stream, closing", e);
- }
- }
-
- public void putInt(final int value) {
- try {
- appendStream.writeInt(value);
- } catch (IOException e) {
- throw new KafkaException("I/O exception when writing to the append stream, closing", e);
- }
- }
-
- public void put(final ByteBuffer buffer) {
- try {
- appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
- } catch (IOException e) {
- throw new KafkaException("I/O exception when writing to the append stream, closing", e);
- }
- }
-
- public void putByte(final byte value) {
- try {
- appendStream.write(value);
- } catch (IOException e) {
- throw new KafkaException("I/O exception when writing to the append stream, closing", e);
- }
- }
-
- public void put(final byte[] bytes, final int offset, final int len) {
- try {
- appendStream.write(bytes, offset, len);
- } catch (IOException e) {
- throw new KafkaException("I/O exception when writing to the append stream, closing", e);
- }
- }
-
- /**
- * @return CRC of the record
- */
- public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
- int valueOffset, int valueSize) {
- // put a record as un-compressed into the underlying stream
- long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
- byte attributes = Record.computeAttributes(type);
- putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
- return crc;
- }
-
- /**
- * Put a record as uncompressed into the underlying stream
- * @return CRC of the record
- */
- public long putRecord(long timestamp, byte[] key, byte[] value) {
- return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
- }
-
- private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
- maxTimestamp = Math.max(maxTimestamp, timestamp);
- Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
- }
-
- public void recordWritten(int size) {
- numRecords += 1;
- writtenUncompressed += size;
- }
-
- public long numRecordsWritten() {
- return numRecords;
- }
-
- public long estimatedBytesWritten() {
- if (type == CompressionType.NONE) {
- return bufferStream.buffer().position();
- } else {
- // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
- }
- }
-
- // the following two functions also need to be public since they are used in MemoryRecords.iteration
-
- public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
- try {
- switch (type) {
- case NONE:
- return new DataOutputStream(buffer);
- case GZIP:
- return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
- case SNAPPY:
- try {
- OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
- return new DataOutputStream(stream);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- case LZ4:
- try {
- OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
- return new DataOutputStream(stream);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- default:
- throw new IllegalArgumentException("Unknown compression type: " + type);
- }
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- }
-
- public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
- try {
- switch (type) {
- case NONE:
- return new DataInputStream(buffer);
- case GZIP:
- return new DataInputStream(new GZIPInputStream(buffer));
- case SNAPPY:
- try {
- InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
- return new DataInputStream(stream);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- case LZ4:
- try {
- InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
- messageVersion == Record.MAGIC_VALUE_V0);
- return new DataInputStream(stream);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- default:
- throw new IllegalArgumentException("Unknown compression type: " + type);
- }
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- }
-
- private interface ConstructorSupplier {
- Constructor get() throws ClassNotFoundException, NoSuchMethodException;
- }
-
- // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
- private static class MemoizingConstructorSupplier {
- final ConstructorSupplier delegate;
- transient volatile boolean initialized;
- transient Constructor value;
-
- public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
- this.delegate = delegate;
- }
-
- public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
- if (!initialized) {
- synchronized (this) {
- if (!initialized) {
- Constructor constructor = delegate.get();
- value = constructor;
- initialized = true;
- return constructor;
- }
- }
- }
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
new file mode 100644
index 0000000..ae393b0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A log input stream which is backed by a {@link FileChannel}.
+ */
+public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelLogEntry> {
+ private int position;
+ private final int end;
+ private final FileChannel channel;
+ private final int maxRecordSize;
+ private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+
+ /**
+ * Create a new log input stream over the FileChannel
+ * @param channel Underlying FileChannel
+ * @param maxRecordSize Maximum size of records
+ * @param start Position in the file channel to start from
+ * @param end Position in the file channel not to read past
+ */
+ public FileLogInputStream(FileChannel channel,
+ int maxRecordSize,
+ int start,
+ int end) {
+ this.channel = channel;
+ this.maxRecordSize = maxRecordSize;
+ this.position = start;
+ this.end = end;
+ }
+
+ @Override
+ public FileChannelLogEntry nextEntry() throws IOException {
+ if (position + Records.LOG_OVERHEAD >= end)
+ return null;
+
+ logHeaderBuffer.rewind();
+ channel.read(logHeaderBuffer, position);
+ if (logHeaderBuffer.hasRemaining())
+ return null;
+
+ logHeaderBuffer.rewind();
+ long offset = logHeaderBuffer.getLong();
+ int size = logHeaderBuffer.getInt();
+
+ if (size < Record.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", Record.RECORD_OVERHEAD_V0));
+
+ if (size > maxRecordSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
+
+ if (position + Records.LOG_OVERHEAD + size > end)
+ return null;
+
+ FileChannelLogEntry logEntry = new FileChannelLogEntry(offset, channel, position, size);
+ position += logEntry.sizeInBytes();
+ return logEntry;
+ }
+
+ /**
+ * Log entry backed by an underlying FileChannel. This allows iteration over the shallow log
+ * entries without needing to read the record data into memory until it is needed. The downside
+ * is that entries will generally no longer be readable when the underlying channel is closed.
+ */
+ public static class FileChannelLogEntry extends LogEntry {
+ private final long offset;
+ private final FileChannel channel;
+ private final int position;
+ private final int recordSize;
+ private Record record = null;
+
+ private FileChannelLogEntry(long offset,
+ FileChannel channel,
+ int position,
+ int recordSize) {
+ this.offset = offset;
+ this.channel = channel;
+ this.position = position;
+ this.recordSize = recordSize;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ @Override
+ public byte magic() {
+ if (record != null)
+ return record.magic();
+
+ try {
+ byte[] magic = new byte[1];
+ ByteBuffer buf = ByteBuffer.wrap(magic);
+ channel.read(buf, position + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET);
+ if (buf.hasRemaining())
+ throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
+ return magic[0];
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * Force load the record and its data (key and value) into memory.
+ * @return The resulting record
+ * @throws IOException for any IO errors reading from the underlying file
+ */
+ private Record loadRecord() throws IOException {
+ if (record != null)
+ return record;
+
+ ByteBuffer recordBuffer = ByteBuffer.allocate(recordSize);
+ channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
+ if (recordBuffer.hasRemaining())
+ throw new IOException("Failed to read full record from channel " + channel);
+
+ recordBuffer.rewind();
+ record = new Record(recordBuffer);
+ return record;
+ }
+
+ @Override
+ public Record record() {
+ if (record != null)
+ return record;
+
+ try {
+ return loadRecord();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return Records.LOG_OVERHEAD + recordSize;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index bdae08d..faf61e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -18,22 +18,31 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry;
+import org.apache.kafka.common.utils.Utils;
+import java.io.Closeable;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * File-backed record set.
+ * A {@link Records} implementation backed by a file. An optional start and end position can be applied to this
+ * instance to enable slicing a range of the log records.
*/
-public class FileRecords implements Records {
- private final File file;
+public class FileRecords extends AbstractRecords implements Closeable {
+ private final boolean isSlice;
private final FileChannel channel;
- private final long start;
- private final long end;
- private final long size;
+ private final int start;
+ private final int end;
+ private volatile File file;
+ private final AtomicInteger size;
public FileRecords(File file,
FileChannel channel,
@@ -44,83 +53,435 @@ public class FileRecords implements Records {
this.channel = channel;
this.start = start;
this.end = end;
+ this.isSlice = isSlice;
+ this.size = new AtomicInteger();
- if (isSlice)
- this.size = end - start;
- else
- this.size = Math.min(channel.size(), end) - start;
+ // set the initial size of the buffer
+ resize();
+ }
+
+ public void resize() throws IOException {
+ if (isSlice) {
+ size.set(end - start);
+ } else {
+ int limit = Math.min((int) channel.size(), end);
+ size.set(limit - start);
+
+ // if this is not a slice, update the file pointer to the end of the file
+ // set the file position to the last byte in the file
+ channel.position(limit);
+ }
}
@Override
public int sizeInBytes() {
- return (int) size;
+ return size.get();
+ }
+
+ /**
+ * Get the underlying file.
+ * @return The file
+ */
+ public File file() {
+ return file;
+ }
+
+ /**
+ * Get the underlying file channel.
+ * @return The file channel
+ */
+ public FileChannel channel() {
+ return channel;
+ }
+
+ /**
+ * Read log entries into a given buffer.
+ * @param buffer The buffer to write the entries to
+ * @param position Position in the buffer to read from
+ * @return The same buffer
+ * @throws IOException
+ */
+ public ByteBuffer readInto(ByteBuffer buffer, int position) throws IOException {
+ channel.read(buffer, position + this.start);
+ buffer.flip();
+ return buffer;
+ }
+
+ /**
+ * Return a slice of records from this instance, which is a view into this set starting from the given position
+ * and with the given size limit.
+ *
+ * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
+ *
+ * If this message set is already sliced, the position will be taken relative to that slicing.
+ *
+ * @param position The start position to begin the read from
+ * @param size The number of bytes after the start position to include
+ * @return A sliced wrapper on this message set limited based on the given position and size
+ */
+ public FileRecords read(int position, int size) throws IOException {
+ if (position < 0)
+ throw new IllegalArgumentException("Invalid position: " + position);
+ if (size < 0)
+ throw new IllegalArgumentException("Invalid size: " + size);
+
+ final int end;
+ if (this.start + position + size < 0)
+ end = sizeInBytes();
+ else
+ end = Math.min(this.start + position + size, sizeInBytes());
+ return new FileRecords(file, channel, this.start + position, end, true);
+ }
+
+ /**
+ * Append log entries to the buffer
+ * @param records The records to append
+ * @return the number of bytes written to the underlying file
+ */
+ public int append(MemoryRecords records) throws IOException {
+ int written = records.writeFullyTo(channel);
+ size.getAndAdd(written);
+ return written;
+ }
+
+ /**
+ * Commit all written data to the physical disk
+ */
+ public void flush() throws IOException {
+ channel.force(true);
+ }
+
+ /**
+ * Close this record set
+ */
+ public void close() throws IOException {
+ flush();
+ trim();
+ channel.close();
+ }
+
+ /**
+ * Delete this message set from the filesystem
+ * @return True iff this message set was deleted.
+ */
+ public boolean delete() {
+ Utils.closeQuietly(channel, "FileChannel");
+ return file.delete();
+ }
+
+ /**
+ * Trim file when close or roll to next file
+ */
+ public void trim() throws IOException {
+ truncateTo(sizeInBytes());
+ }
+
+ /**
+ * Update the file reference (to be used with caution since this does not reopen the file channel)
+ * @param file The new file to use
+ */
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+ /**
+ * Rename the file that backs this message set
+ * @throws IOException if rename fails.
+ */
+ public void renameTo(File f) throws IOException {
+ try {
+ Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
+ } finally {
+ this.file = f;
+ }
+ }
+
+ /**
+ * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
+ * given size falls on a valid message boundary.
+ * In some versions of the JDK truncating to the same size as the file message set will cause an
+ * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
+ * size of the underlying FileChannel.
+ * It is expected that no other threads will do writes to the log when this function is called.
+ * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
+ * @return The number of bytes truncated off
+ */
+ public int truncateTo(int targetSize) throws IOException {
+ int originalSize = sizeInBytes();
+ if (targetSize > originalSize || targetSize < 0)
+ throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
+ " size of this log segment is " + originalSize + " bytes.");
+ if (targetSize < (int) channel.size()) {
+ channel.truncate(targetSize);
+ channel.position(targetSize);
+ size.set(targetSize);
+ }
+ return originalSize - targetSize;
}
@Override
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
- if (newSize < size)
+ if (newSize < size.get())
throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));
- if (offset > size)
- throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size));
-
long position = start + offset;
- long count = Math.min(length, this.size - offset);
+ long count = Math.min(length, size.get());
+ final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;
- return tl.transferFrom(this.channel, position, count);
+ bytesTransferred = tl.transferFrom(channel, position, count);
} else {
- return this.channel.transferTo(position, count, destChannel);
+ bytesTransferred = channel.transferTo(position, count, destChannel);
+ }
+ return bytesTransferred;
+ }
+
+ /**
+ * Search forward for the file position of the last offset that is greater than or equal to the target offset
+ * and return its physical position and the size of the message (including log overhead) at the returned offset. If
+ * no such offsets are found, return null.
+ *
+ * @param targetOffset The offset to search for.
+ * @param startingPosition The starting position in the file to begin searching from.
+ */
+ public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
+ Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition);
+ while (iterator.hasNext()) {
+ FileChannelLogEntry entry = iterator.next();
+ long offset = entry.offset();
+ if (offset >= targetOffset)
+ return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
+ }
+ return null;
+ }
+
+ /**
+ * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+ *
+ * @param targetTimestamp The timestamp to search for.
+ * @param startingPosition The starting position to search.
+ * @return The timestamp and offset of the message found. None, if no message is found.
+ */
+ public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
+ Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
+ while (shallowIterator.hasNext()) {
+ LogEntry shallowEntry = shallowIterator.next();
+ Record shallowRecord = shallowEntry.record();
+ if (shallowRecord.timestamp() >= targetTimestamp) {
+ // We found a message
+ for (LogEntry deepLogEntry : shallowEntry) {
+ long timestamp = deepLogEntry.record().timestamp();
+ if (timestamp >= targetTimestamp)
+ return new TimestampAndOffset(timestamp, deepLogEntry.offset());
+ }
+ throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s" +
+ " should contain target timestamp %s, but does not.", shallowRecord.timestamp(),
+ shallowEntry.offset(), targetTimestamp));
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return the largest timestamp of the messages after a given position in this file message set.
+ * @param startingPosition The starting position.
+ * @return The largest timestamp of the messages after the given position.
+ */
+ public TimestampAndOffset largestTimestampAfter(int startingPosition) {
+ long maxTimestamp = Record.NO_TIMESTAMP;
+ long offsetOfMaxTimestamp = -1L;
+
+ Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
+ while (shallowIterator.hasNext()) {
+ LogEntry shallowEntry = shallowIterator.next();
+ long timestamp = shallowEntry.record().timestamp();
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ offsetOfMaxTimestamp = shallowEntry.offset();
+ }
}
+ return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
+ }
+
+ /**
+ * Get an iterator over the shallow entries in the file. Note that the entries are
+ * backed by the open file channel. When the channel is closed (i.e. when this instance
+ * is closed), the entries will generally no longer be readable.
+ * @return An iterator over the shallow entries
+ */
+ @Override
+ public Iterator<FileChannelLogEntry> shallowIterator() {
+ return shallowIteratorFrom(start);
+ }
+
+ /**
+ * Get an iterator over the shallow entries, enforcing a maximum record size
+ * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
+ * @return An iterator over the shallow entries
+ */
+ public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) {
+ return shallowIteratorFrom(maxRecordSize, start);
+ }
+
+ private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) {
+ return shallowIteratorFrom(Integer.MAX_VALUE, start);
+ }
+
+ private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) {
+ final int end;
+ if (isSlice)
+ end = this.end;
+ else
+ end = this.sizeInBytes();
+ FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
+ return RecordsIterator.shallowIterator(inputStream);
}
@Override
- public RecordsIterator iterator() {
- return new RecordsIterator(new FileLogInputStream(channel, start, end), false);
+ public Iterator<LogEntry> deepIterator() {
+ final int end;
+ if (isSlice)
+ end = this.end;
+ else
+ end = this.sizeInBytes();
+ FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end);
+ return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE);
+ }
+
+ public static FileRecords open(File file,
+ boolean mutable,
+ boolean fileAlreadyExists,
+ int initFileSize,
+ boolean preallocate) throws IOException {
+ FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
+ int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
+ return new FileRecords(file, channel, 0, end, false);
+ }
+
+ public static FileRecords open(File file,
+ boolean fileAlreadyExists,
+ int initFileSize,
+ boolean preallocate) throws IOException {
+ return open(file, true, fileAlreadyExists, initFileSize, preallocate);
+ }
+
+ public static FileRecords open(File file, boolean mutable) throws IOException {
+ return open(file, mutable, false, 0, false);
+ }
+
+ public static FileRecords open(File file) throws IOException {
+ return open(file, true);
+ }
+
+ /**
+ * Open a channel for the given file
+ * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+ * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
+ * @param file File path
+ * @param mutable mutable
+ * @param fileAlreadyExists File already exists or not
+ * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
+ * @param preallocate Pre allocate file or not, gotten from configuration.
+ */
+ private static FileChannel openChannel(File file,
+ boolean mutable,
+ boolean fileAlreadyExists,
+ int initFileSize,
+ boolean preallocate) throws IOException {
+ if (mutable) {
+ if (fileAlreadyExists) {
+ return new RandomAccessFile(file, "rw").getChannel();
+ } else {
+ if (preallocate) {
+ RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ randomAccessFile.setLength(initFileSize);
+ return randomAccessFile.getChannel();
+ } else {
+ return new RandomAccessFile(file, "rw").getChannel();
+ }
+ }
+ } else {
+ return new FileInputStream(file).getChannel();
+ }
}
- private static class FileLogInputStream implements LogInputStream {
- private long position;
- protected final long end;
- protected final FileChannel channel;
- private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+ public static class LogEntryPosition {
+ public final long offset;
+ public final int position;
+ public final int size;
- public FileLogInputStream(FileChannel channel, long start, long end) {
- this.channel = channel;
- this.position = start;
- this.end = end;
+ public LogEntryPosition(long offset, int position, int size) {
+ this.offset = offset;
+ this.position = position;
+ this.size = size;
}
@Override
- public LogEntry nextEntry() throws IOException {
- if (position + Records.LOG_OVERHEAD >= end)
- return null;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- logHeaderBuffer.rewind();
- channel.read(logHeaderBuffer, position);
- if (logHeaderBuffer.hasRemaining())
- return null;
+ LogEntryPosition that = (LogEntryPosition) o;
- logHeaderBuffer.rewind();
- long offset = logHeaderBuffer.getLong();
- int size = logHeaderBuffer.getInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
+ if (offset != that.offset) return false;
+ if (position != that.position) return false;
+ return size == that.size;
- if (position + Records.LOG_OVERHEAD + size > end)
- return null;
+ }
- ByteBuffer recordBuffer = ByteBuffer.allocate(size);
- channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
- if (recordBuffer.hasRemaining())
- return null;
- recordBuffer.rewind();
+ @Override
+ public int hashCode() {
+ int result = (int) (offset ^ (offset >>> 32));
+ result = 31 * result + position;
+ result = 31 * result + size;
+ return result;
+ }
- Record record = new Record(recordBuffer);
- LogEntry logEntry = new LogEntry(offset, record);
- position += logEntry.size();
- return logEntry;
+ @Override
+ public String toString() {
+ return "LogEntryPosition(" +
+ "offset=" + offset +
+ ", position=" + position +
+ ", size=" + size +
+ ')';
}
}
+
+ public static class TimestampAndOffset {
+ public final long timestamp;
+ public final long offset;
+
+ public TimestampAndOffset(long timestamp, long offset) {
+ this.timestamp = timestamp;
+ this.offset = offset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimestampAndOffset that = (TimestampAndOffset) o;
+
+ if (timestamp != that.timestamp) return false;
+ return offset == that.offset;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (timestamp ^ (timestamp >>> 32));
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimestampAndOffset(" +
+ "timestamp=" + timestamp +
+ ", offset=" + offset +
+ ')';
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index a1009ca..ee60713 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
-public class InvalidRecordException extends KafkaException {
+public class InvalidRecordException extends CorruptRecordException {
private static final long serialVersionUID = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index 2e54b56..d2db356 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -16,33 +16,156 @@
*/
package org.apache.kafka.common.record;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+
/**
* An offset and record pair
*/
-public final class LogEntry {
+public abstract class LogEntry implements Iterable<LogEntry> {
- private final long offset;
- private final Record record;
+ /**
+ * Get the offset of this entry. Note that if this entry contains a compressed
+ * message set, then this offset will be the last offset of the nested entries
+ * @return the last offset contained in this entry
+ */
+ public abstract long offset();
- public LogEntry(long offset, Record record) {
- this.offset = offset;
- this.record = record;
+ /**
+ * Get the shallow record for this log entry.
+ * @return the shallow record
+ */
+ public abstract Record record();
+
+ /**
+ * Get the first offset of the records contained in this entry. Note that this
+ * generally requires deep iteration, which requires decompression, so this should
+ * be used with caution.
+ * @return The first offset contained in this entry
+ */
+ public long firstOffset() {
+ return iterator().next().offset();
}
- public long offset() {
- return this.offset;
+ /**
+ * Get the offset following this entry (i.e. the last offset contained in this entry plus one).
+ * @return the next consecutive offset following this entry
+ */
+ public long nextOffset() {
+ return offset() + 1;
}
- public Record record() {
- return this.record;
+ /**
+ * Get the message format version of this entry (i.e its magic value).
+ * @return the magic byte
+ */
+ public byte magic() {
+ return record().magic();
}
@Override
public String toString() {
- return "LogEntry(" + offset + ", " + record + ")";
+ return "LogEntry(" + offset() + ", " + record() + ")";
+ }
+
+ /**
+ * Get the size in bytes of this entry, including the size of the record and the log overhead.
+ * @return The size in bytes of this entry
+ */
+ public int sizeInBytes() {
+ return record().sizeInBytes() + LOG_OVERHEAD;
+ }
+
+ /**
+ * Check whether this entry contains a compressed message set.
+ * @return true if so, false otherwise
+ */
+ public boolean isCompressed() {
+ return record().compressionType() != CompressionType.NONE;
+ }
+
+ /**
+ * Write this entry into a buffer.
+ * @param buffer The buffer to write the entry to
+ */
+ public void writeTo(ByteBuffer buffer) {
+ writeHeader(buffer, offset(), record().sizeInBytes());
+ buffer.put(record().buffer().duplicate());
+ }
+
+ /**
+ * Get an iterator for the nested entries contained within this log entry. Note that
+ * if the entry is not compressed, then this method will return an iterator over the
+ * shallow entry only (i.e. this object).
+ * @return An iterator over the entries contained within this log entry
+ */
+ @Override
+ public Iterator<LogEntry> iterator() {
+ if (isCompressed())
+ return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+ return Collections.singletonList(this).iterator();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || !(o instanceof LogEntry)) return false;
+
+ LogEntry that = (LogEntry) o;
+
+ if (offset() != that.offset()) return false;
+ Record thisRecord = record();
+ Record thatRecord = that.record();
+ return thisRecord != null ? thisRecord.equals(thatRecord) : thatRecord == null;
+ }
+
+ @Override
+ public int hashCode() {
+ long offset = offset();
+ Record record = record();
+ int result = (int) (offset ^ (offset >>> 32));
+ result = 31 * result + (record != null ? record.hashCode() : 0);
+ return result;
+ }
+
+ public static void writeHeader(ByteBuffer buffer, long offset, int size) {
+ buffer.putLong(offset);
+ buffer.putInt(size);
+ }
+
+ public static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {
+ out.writeLong(offset);
+ out.writeInt(size);
}
-
- public int size() {
- return record.size() + Records.LOG_OVERHEAD;
+
+ private static class SimpleLogEntry extends LogEntry {
+ private final long offset;
+ private final Record record;
+
+ public SimpleLogEntry(long offset, Record record) {
+ this.offset = offset;
+ this.record = record;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public Record record() {
+ return record;
+ }
+
+ }
+
+ public static LogEntry create(long offset, Record record) {
+ return new SimpleLogEntry(offset, record);
}
+
}