You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:35 UTC
[8/9] kafka git commit: KAFKA-4390;
Replace MessageSet usage with client-side alternatives
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
index 4a4d569..a9af651 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
@@ -20,10 +20,14 @@ import java.io.IOException;
/**
* An abstraction between an underlying input stream and record iterators, a LogInputStream
- * returns only the shallow log entries, depending on {@link org.apache.kafka.common.record.RecordsIterator.DeepRecordsIterator}
- * for the deep iteration.
+ * returns only the shallow log entries, depending on {@link RecordsIterator.DeepRecordsIterator}
+ * for the deep iteration. The generic typing allows for implementations which present only
+ * a view of the log entries, which enables more efficient iteration when the record data is
+ * not actually needed. See for example {@link org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry}
+ * in which the record is not brought into memory until needed.
+ * @param <T> Type parameter of the log entry
*/
-interface LogInputStream {
+interface LogInputStream<T extends LogEntry> {
/**
* Get the next log entry from the underlying input stream.
@@ -31,5 +35,5 @@ interface LogInputStream {
* @return The next log entry or null if there is none
* @throws IOException for any IO errors
*/
- LogEntry nextEntry() throws IOException;
+ T nextEntry() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 65ccf98..b945062 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -12,197 +12,185 @@
*/
package org.apache.kafka.common.record;
-import java.io.DataInputStream;
+import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
/**
- * A {@link Records} implementation backed by a ByteBuffer.
+ * A {@link Records} implementation backed by a ByteBuffer. This is used only for reading or
+ * modifying in-place an existing buffer of log entries. To create a new buffer see {@link MemoryRecordsBuilder},
+ * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder} variants.
*/
-public class MemoryRecords implements Records {
+public class MemoryRecords extends AbstractRecords {
public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
- private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
-
- // the compressor used for appends-only
- private final Compressor compressor;
-
- // the write limit for writable buffer, which may be smaller than the buffer capacity
- private final int writeLimit;
-
- // the capacity of the initial buffer, which is only used for de-allocation of writable records
- private final int initialCapacity;
-
// the underlying buffer used for read; while the records are still writable it is null
private ByteBuffer buffer;
-
- // indicate if the memory records is writable or not (i.e. used for appends or read-only)
- private boolean writable;
+ private int validBytes = -1;
// Construct a writable memory records
- private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
- this.writable = writable;
- this.writeLimit = writeLimit;
- this.initialCapacity = buffer.capacity();
- if (this.writable) {
- this.buffer = null;
- this.compressor = new Compressor(buffer, type);
- } else {
- this.buffer = buffer;
- this.compressor = null;
- }
- }
-
- public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
- return new MemoryRecords(buffer, type, true, writeLimit);
- }
-
- public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
- // use the buffer capacity as the default write limit
- return emptyRecords(buffer, type, buffer.capacity());
+ private MemoryRecords(ByteBuffer buffer) {
+ this.buffer = buffer;
}
- public static MemoryRecords readableRecords(ByteBuffer buffer) {
- return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY);
+ @Override
+ public int sizeInBytes() {
+ return buffer.limit();
}
- /**
- * Append the given record and offset to the buffer
- */
- public void append(long offset, Record record) {
- if (!writable)
- throw new IllegalStateException("Memory records is not writable");
-
- int size = record.size();
- compressor.putLong(offset);
- compressor.putInt(size);
- compressor.put(record.buffer());
- compressor.recordWritten(size + Records.LOG_OVERHEAD);
- record.buffer().rewind();
+ @Override
+ public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException {
+ ByteBuffer dup = buffer.duplicate();
+ int pos = (int) position;
+ dup.position(pos);
+ dup.limit(pos + length);
+ return channel.write(dup);
}
/**
- * Append a new record and offset to the buffer
- * @return crc of the record
+ * Write all records to the given channel (including partial records).
+ * @param channel The channel to write to
+ * @return The number of bytes written
+ * @throws IOException For any IO errors writing to the channel
*/
- public long append(long offset, long timestamp, byte[] key, byte[] value) {
- if (!writable)
- throw new IllegalStateException("Memory records is not writable");
-
- int size = Record.recordSize(key, value);
- compressor.putLong(offset);
- compressor.putInt(size);
- long crc = compressor.putRecord(timestamp, key, value);
- compressor.recordWritten(size + Records.LOG_OVERHEAD);
- return crc;
+ public int writeFullyTo(GatheringByteChannel channel) throws IOException {
+ buffer.mark();
+ int written = 0;
+ while (written < sizeInBytes())
+ written += channel.write(buffer);
+ buffer.reset();
+ return written;
}
/**
- * Check if we have room for a new record containing the given key/value pair
- *
- * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
- * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
- * re-allocation in the underlying byte buffer stream.
- *
- * There is an exceptional case when appending a single message whose size is larger than the batch size, the
- * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
- * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
- * to accept this single record.
+ * The total number of bytes in this message set not including any partial, trailing messages. This
+ * may be smaller than what is returned by {@link #sizeInBytes()}.
+ * @return The number of valid bytes
*/
- public boolean hasRoomFor(byte[] key, byte[] value) {
- if (!this.writable)
- return false;
+ public int validBytes() {
+ if (validBytes >= 0)
+ return validBytes;
- return this.compressor.numRecordsWritten() == 0 ?
- this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
- this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
- }
+ int bytes = 0;
+ Iterator<ByteBufferLogEntry> iterator = shallowIterator();
+ while (iterator.hasNext())
+ bytes += iterator.next().sizeInBytes();
- public boolean isFull() {
- return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
+ this.validBytes = bytes;
+ return bytes;
}
/**
- * Close this batch for no more appends
+ * Filter the records into the provided ByteBuffer.
+ * @param filter The filter function
+ * @param buffer The byte buffer to write the filtered records to
+ * @return A FilterResult with a summary of the output (for metrics)
*/
- public void close() {
- if (writable) {
- // close the compressor to fill-in wrapper message metadata if necessary
- compressor.close();
-
- // flip the underlying buffer to be ready for reads
- buffer = compressor.buffer();
- buffer.flip();
-
- // reset the writable flag
- writable = false;
+ public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) {
+ long maxTimestamp = Record.NO_TIMESTAMP;
+ long shallowOffsetOfMaxTimestamp = -1L;
+ int messagesRead = 0;
+ int bytesRead = 0;
+ int messagesRetained = 0;
+ int bytesRetained = 0;
+
+ Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator();
+ while (shallowIterator.hasNext()) {
+ ByteBufferLogEntry shallowEntry = shallowIterator.next();
+ bytesRead += shallowEntry.sizeInBytes();
+
+ // We use the absolute offset to decide whether to retain the message or not (this is handled by the
+ // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
+ // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
+ // of the inner messages. This will be fixed as we recopy the messages to the destination buffer.
+
+ Record shallowRecord = shallowEntry.record();
+ byte shallowMagic = shallowRecord.magic();
+ boolean writeOriginalEntry = true;
+ List<LogEntry> retainedEntries = new ArrayList<>();
+
+ for (LogEntry deepEntry : shallowEntry) {
+ Record deepRecord = deepEntry.record();
+ messagesRead += 1;
+
+ if (filter.shouldRetain(deepEntry)) {
+ // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+ // the corrupted entry with correct data.
+ if (shallowMagic != deepRecord.magic())
+ writeOriginalEntry = false;
+
+ retainedEntries.add(deepEntry);
+ } else {
+ writeOriginalEntry = false;
+ }
+ }
+
+ if (writeOriginalEntry) {
+ // There are no messages compacted out and no message format conversion, write the original message set back
+ shallowEntry.writeTo(buffer);
+ messagesRetained += retainedEntries.size();
+ bytesRetained += shallowEntry.sizeInBytes();
+
+ if (shallowRecord.timestamp() > maxTimestamp) {
+ maxTimestamp = shallowRecord.timestamp();
+ shallowOffsetOfMaxTimestamp = shallowEntry.offset();
+ }
+ } else if (!retainedEntries.isEmpty()) {
+ ByteBuffer slice = buffer.slice();
+ MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(),
+ shallowRecord.timestamp(), retainedEntries);
+ MemoryRecords records = builder.build();
+ buffer.position(buffer.position() + slice.position());
+ messagesRetained += retainedEntries.size();
+ bytesRetained += records.sizeInBytes();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ if (info.maxTimestamp > maxTimestamp) {
+ maxTimestamp = info.maxTimestamp;
+ shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
+ }
+ }
}
+
+ return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, shallowOffsetOfMaxTimestamp);
}
/**
- * The size of this record set
+ * Get the byte buffer that backs this instance for reading.
*/
- @Override
- public int sizeInBytes() {
- if (writable) {
- return compressor.buffer().position();
- } else {
- return buffer.limit();
- }
+ public ByteBuffer buffer() {
+ return buffer.duplicate();
}
@Override
- public long writeTo(GatheringByteChannel channel, long offset, int length) throws IOException {
- ByteBuffer dup = buffer.duplicate();
- int position = (int) offset;
- dup.position(position);
- dup.limit(position + length);
- return channel.write(dup);
+ public Iterator<ByteBufferLogEntry> shallowIterator() {
+ return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
- /**
- * The compression rate of this record set
- */
- public double compressionRate() {
- if (compressor == null)
- return 1.0;
- else
- return compressor.compressionRate();
+ @Override
+ public Iterator<LogEntry> deepIterator() {
+ return deepIterator(false);
}
- /**
- * Return the capacity of the initial buffer, for writable records
- * it may be different from the current buffer's capacity
- */
- public int initialCapacity() {
- return this.initialCapacity;
+ public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) {
+ return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
}
- /**
- * Get the byte buffer that backs this records instance for reading
- */
- public ByteBuffer buffer() {
- if (writable)
- throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer");
-
- return buffer.duplicate();
+ public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+ return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
+ ensureMatchingMagic, maxMessageSize);
}
@Override
- public Iterator<LogEntry> iterator() {
- ByteBuffer input = this.buffer.duplicate();
- if (writable)
- // flip on a duplicate buffer for reading
- input.flip();
- return new RecordsIterator(new ByteBufferLogInputStream(input), false);
- }
-
- @Override
public String toString() {
- Iterator<LogEntry> iter = iterator();
+ Iterator<LogEntry> iter = deepIterator();
StringBuilder builder = new StringBuilder();
builder.append('[');
while (iter.hasNext()) {
@@ -214,16 +202,13 @@ public class MemoryRecords implements Records {
builder.append("record=");
builder.append(entry.record());
builder.append(")");
+ if (iter.hasNext())
+ builder.append(", ");
}
builder.append(']');
return builder.toString();
}
- /** Visible for testing */
- public boolean isWritable() {
- return writable;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -232,7 +217,6 @@ public class MemoryRecords implements Records {
MemoryRecords that = (MemoryRecords) o;
return buffer.equals(that.buffer);
-
}
@Override
@@ -240,28 +224,153 @@ public class MemoryRecords implements Records {
return buffer.hashCode();
}
- private static class ByteBufferLogInputStream implements LogInputStream {
- private final DataInputStream stream;
- private final ByteBuffer buffer;
+ public interface LogEntryFilter {
+ boolean shouldRetain(LogEntry entry);
+ }
- private ByteBufferLogInputStream(ByteBuffer buffer) {
- this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
- this.buffer = buffer;
+ public static class FilterResult {
+ public final int messagesRead;
+ public final int bytesRead;
+ public final int messagesRetained;
+ public final int bytesRetained;
+ public final long maxTimestamp;
+ public final long shallowOffsetOfMaxTimestamp;
+
+ public FilterResult(int messagesRead,
+ int bytesRead,
+ int messagesRetained,
+ int bytesRetained,
+ long maxTimestamp,
+ long shallowOffsetOfMaxTimestamp) {
+ this.messagesRead = messagesRead;
+ this.bytesRead = bytesRead;
+ this.messagesRetained = messagesRetained;
+ this.bytesRetained = bytesRetained;
+ this.maxTimestamp = maxTimestamp;
+ this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
+ }
- public LogEntry nextEntry() throws IOException {
- long offset = stream.readLong();
- int size = stream.readInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
-
- ByteBuffer slice = buffer.slice();
- int newPos = buffer.position() + size;
- if (newPos > buffer.limit())
- return null;
- buffer.position(newPos);
- slice.limit(size);
- return new LogEntry(offset, new Record(slice));
- }
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ int writeLimit) {
+ return new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L, System.currentTimeMillis(), writeLimit);
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime) {
+ return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, buffer.capacity());
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ CompressionType compressionType,
+ TimestampType timestampType) {
+ // use the buffer capacity as the default write limit
+ return builder(buffer, compressionType, timestampType, buffer.capacity());
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType) {
+ return builder(buffer, magic, compressionType, timestampType, 0L);
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset) {
+ return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
+ }
+
+ public static MemoryRecords readableRecords(ByteBuffer buffer) {
+ return new MemoryRecords(buffer);
+ }
+
+ public static MemoryRecords withLogEntries(CompressionType compressionType, List<LogEntry> entries) {
+ return withLogEntries(TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), entries);
+ }
+
+ public static MemoryRecords withLogEntries(LogEntry ... entries) {
+ return withLogEntries(CompressionType.NONE, Arrays.asList(entries));
+ }
+
+ public static MemoryRecords withRecords(CompressionType compressionType, long initialOffset, List<Record> records) {
+ return withRecords(initialOffset, TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), records);
}
+
+ public static MemoryRecords withRecords(Record ... records) {
+ return withRecords(CompressionType.NONE, 0L, Arrays.asList(records));
+ }
+
+ public static MemoryRecords withRecords(long initialOffset, Record ... records) {
+ return withRecords(CompressionType.NONE, initialOffset, Arrays.asList(records));
+ }
+
+ public static MemoryRecords withRecords(CompressionType compressionType, Record ... records) {
+ return withRecords(compressionType, 0L, Arrays.asList(records));
+ }
+
+ public static MemoryRecords withRecords(TimestampType timestampType, CompressionType compressionType, Record ... records) {
+ return withRecords(0L, timestampType, compressionType, System.currentTimeMillis(), Arrays.asList(records));
+ }
+
+ public static MemoryRecords withRecords(long initialOffset,
+ TimestampType timestampType,
+ CompressionType compressionType,
+ long logAppendTime,
+ List<Record> records) {
+ return withLogEntries(timestampType, compressionType, logAppendTime, buildLogEntries(initialOffset, records));
+ }
+
+ private static MemoryRecords withLogEntries(TimestampType timestampType,
+ CompressionType compressionType,
+ long logAppendTime,
+ List<LogEntry> entries) {
+ if (entries.isEmpty())
+ return MemoryRecords.EMPTY;
+ return builderWithEntries(timestampType, compressionType, logAppendTime, entries).build();
+ }
+
+ private static List<LogEntry> buildLogEntries(long initialOffset, List<Record> records) {
+ List<LogEntry> entries = new ArrayList<>();
+ for (Record record : records)
+ entries.add(LogEntry.create(initialOffset++, record));
+ return entries;
+ }
+
+ public static MemoryRecordsBuilder builderWithEntries(TimestampType timestampType,
+ CompressionType compressionType,
+ long logAppendTime,
+ List<LogEntry> entries) {
+ ByteBuffer buffer = ByteBuffer.allocate(estimatedSize(compressionType, entries));
+ return builderWithEntries(buffer, timestampType, compressionType, logAppendTime, entries);
+ }
+
+ private static MemoryRecordsBuilder builderWithEntries(ByteBuffer buffer,
+ TimestampType timestampType,
+ CompressionType compressionType,
+ long logAppendTime,
+ List<LogEntry> entries) {
+ if (entries.isEmpty())
+ throw new IllegalArgumentException();
+
+ LogEntry firstEntry = entries.iterator().next();
+ long firstOffset = firstEntry.offset();
+ byte magic = firstEntry.record().magic();
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType,
+ firstOffset, logAppendTime);
+ for (LogEntry entry : entries)
+ builder.append(entry);
+
+ return builder;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
new file mode 100644
index 0000000..b90a9e6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}.
+ * It transparently handles compression and exposes methods for appending new entries, possibly with message
+ * format conversion.
+ */
+public class MemoryRecordsBuilder {
+
+ static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+ static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+ static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+
+ private static final float[] TYPE_TO_RATE;
+
+ static {
+ int maxTypeId = -1;
+ for (CompressionType type : CompressionType.values())
+ maxTypeId = Math.max(maxTypeId, type.id);
+ TYPE_TO_RATE = new float[maxTypeId + 1];
+ for (CompressionType type : CompressionType.values()) {
+ TYPE_TO_RATE[type.id] = type.rate;
+ }
+ }
+
+ // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
+ // caching constructors to avoid invoking of Class.forName method for each batch
+ private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyOutputStream")
+ .getConstructor(OutputStream.class, Integer.TYPE);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+ .getConstructor(OutputStream.class, Boolean.TYPE);
+ }
+ });
+
+ private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyInputStream")
+ .getConstructor(InputStream.class);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+ return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+ .getConstructor(InputStream.class, Boolean.TYPE);
+ }
+ });
+
+ private final TimestampType timestampType;
+ private final CompressionType compressionType;
+ private final DataOutputStream appendStream;
+ private final ByteBufferOutputStream bufferStream;
+ private final byte magic;
+ private final int initPos;
+ private final long baseOffset;
+ private final long logAppendTime;
+ private final int writeLimit;
+ private final int initialCapacity;
+
+ private MemoryRecords builtRecords;
+ private long writtenUncompressed;
+ private long numRecords;
+ private float compressionRate;
+ private long maxTimestamp;
+ private long offsetOfMaxTimestamp;
+ private long lastOffset = -1;
+
+ public MemoryRecordsBuilder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ int writeLimit) {
+ this.magic = magic;
+ this.timestampType = timestampType;
+ this.compressionType = compressionType;
+ this.baseOffset = baseOffset;
+ this.logAppendTime = logAppendTime;
+ this.initPos = buffer.position();
+ this.numRecords = 0;
+ this.writtenUncompressed = 0;
+ this.compressionRate = 1;
+ this.maxTimestamp = Record.NO_TIMESTAMP;
+ this.writeLimit = writeLimit;
+ this.initialCapacity = buffer.capacity();
+
+ if (compressionType != CompressionType.NONE) {
+ // for compressed records, leave space for the header and the shallow message metadata
+ // and move the starting position to the value payload offset
+ buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic));
+ }
+
+ // create the stream
+ bufferStream = new ByteBufferOutputStream(buffer);
+ appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
+ }
+
+ public ByteBuffer buffer() {
+ return bufferStream.buffer();
+ }
+
+ public int initialCapacity() {
+ return initialCapacity;
+ }
+
+ public double compressionRate() {
+ return compressionRate;
+ }
+
+ /**
+ * Close this builder and return the resulting buffer.
+ * @return The built log buffer
+ */
+ public MemoryRecords build() {
+ close();
+ return builtRecords;
+ }
+
+ /**
+ * Get the max timestamp and its offset. If the log append time is used, then the offset will
+ * be either the first offset in the set if no compression is used or the last offset otherwise.
+ * @return The max timestamp and its offset
+ */
+ public RecordsInfo info() {
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ return new RecordsInfo(logAppendTime, lastOffset);
+ else
+ return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
+ }
+
+ public void close() {
+ if (builtRecords != null)
+ return;
+
+ try {
+ appendStream.close();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+
+ if (compressionType != CompressionType.NONE)
+ writerCompressedWrapperHeader();
+
+ ByteBuffer buffer = buffer().duplicate();
+ buffer.flip();
+ buffer.position(initPos);
+ builtRecords = MemoryRecords.readableRecords(buffer.slice());
+ }
+
+ private void writerCompressedWrapperHeader() {
+ ByteBuffer buffer = bufferStream.buffer();
+ int pos = buffer.position();
+ buffer.position(initPos);
+
+ int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
+ int writtenCompressed = wrapperSize - Record.recordOverhead(magic);
+ LogEntry.writeHeader(buffer, lastOffset, wrapperSize);
+
+ long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
+ Record.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
+
+ buffer.position(pos);
+
+ // update the compression ratio
+ this.compressionRate = (float) writtenCompressed / this.writtenUncompressed;
+ TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+ compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+ }
+
+ /**
+ * Append a new record and offset to the buffer
+ * @param offset The absolute offset of the record in the log buffer
+ * @param timestamp The record timestamp
+ * @param key The record key
+ * @param value The record value
+ * @return crc of the record
+ */
+ public long append(long offset, long timestamp, byte[] key, byte[] value) {
+ try {
+ if (lastOffset > 0 && offset <= lastOffset)
+ throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+
+ int size = Record.recordSize(magic, key, value);
+ LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ timestamp = logAppendTime;
+ long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
+ recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
+ return crc;
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ /**
+ * Add the record, converting to the desired magic value if necessary.
+ * @param offset The offset of the record
+ * @param record The record to add
+ */
+ public void convertAndAppend(long offset, Record record) {
+ if (magic == record.magic()) {
+ append(offset, record);
+ return;
+ }
+
+ if (lastOffset > 0 && offset <= lastOffset)
+ throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+
+ try {
+ int size = record.convertedSize(magic);
+ LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+ long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : record.timestamp();
+ record.convertTo(appendStream, magic, timestamp, timestampType);
+ recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ /**
+ * Add a record without doing offset/magic validation (this should only be used in testing).
+ * @param offset The offset of the record
+ * @param record The record to add
+ */
+ public void appendUnchecked(long offset, Record record) {
+ try {
+ int size = record.sizeInBytes();
+ LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+
+ ByteBuffer buffer = record.buffer().duplicate();
+ appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
+
+ recordWritten(offset, record.timestamp(), size + Records.LOG_OVERHEAD);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ /**
+ * Append the given log entry. The entry's record must have a magic which matches the magic use to
+ * construct this builder and the offset must be greater than the last appended entry.
+ * @param entry The entry to append
+ */
+ public void append(LogEntry entry) {
+ append(entry.offset(), entry.record());
+ }
+
+ /**
+ * Add a record with a given offset. The record must have a magic which matches the magic use to
+ * construct this builder and the offset must be greater than the last appended entry.
+ * @param offset The offset of the record
+ * @param record The record to add
+ */
+ public void append(long offset, Record record) {
+ if (record.magic() != magic)
+ throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper");
+ if (lastOffset > 0 && offset <= lastOffset)
+ throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+ appendUnchecked(offset, record);
+ }
+
+ private long toInnerOffset(long offset) {
+ // use relative offsets for compressed messages with magic v1
+ if (magic > 0 && compressionType != CompressionType.NONE)
+ return offset - baseOffset;
+ return offset;
+ }
+
+ private void recordWritten(long offset, long timestamp, int size) {
+ numRecords += 1;
+ writtenUncompressed += size;
+ lastOffset = offset;
+
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ offsetOfMaxTimestamp = offset;
+ }
+ }
+
+ /**
+ * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
+ * @return The estimated number of bytes written
+ */
+ private int estimatedBytesWritten() {
+ if (compressionType == CompressionType.NONE) {
+ return buffer().position();
+ } else {
+ // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
+ return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ }
+ }
+
+ /**
+ * Check if we have room for a new record containing the given key/value pair
+ *
+ * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
+ * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
+ * re-allocation in the underlying byte buffer stream.
+ *
+ * There is an exceptional case when appending a single message whose size is larger than the batch size, the
+ * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
+ * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
+ * to accept this single record.
+ */
+ public boolean hasRoomFor(byte[] key, byte[] value) {
+ return !isFull() && (numRecords == 0 ?
+ this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(magic, key, value) :
+ this.writeLimit >= estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(magic, key, value));
+ }
+
+ public boolean isClosed() {
+ return builtRecords != null;
+ }
+
+ public boolean isFull() {
+ return isClosed() || this.writeLimit <= estimatedBytesWritten();
+ }
+
+ public int sizeInBytes() {
+ return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
+ }
+
+ private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
+ try {
+ switch (type) {
+ case NONE:
+ return buffer;
+ case GZIP:
+ return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+ case SNAPPY:
+ try {
+ OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
+ return new DataOutputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ case LZ4:
+ try {
+ OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer,
+ messageVersion == Record.MAGIC_VALUE_V0);
+ return new DataOutputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + type);
+ }
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+ try {
+ switch (type) {
+ case NONE:
+ return buffer;
+ case GZIP:
+ return new DataInputStream(new GZIPInputStream(buffer));
+ case SNAPPY:
+ try {
+ InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
+ return new DataInputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ case LZ4:
+ try {
+ InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
+ messageVersion == Record.MAGIC_VALUE_V0);
+ return new DataInputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + type);
+ }
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private interface ConstructorSupplier {
+ Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+ }
+
+ // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
+ private static class MemoizingConstructorSupplier {
+ final ConstructorSupplier delegate;
+ transient volatile boolean initialized;
+ transient Constructor value;
+
+ public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+ this.delegate = delegate;
+ }
+
+ public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ value = delegate.get();
+ initialized = true;
+ }
+ }
+ }
+ return value;
+ }
+ }
+
+ public static class RecordsInfo {
+ public final long maxTimestamp;
+ public final long shallowOffsetOfMaxTimestamp;
+
+ public RecordsInfo(long maxTimestamp,
+ long shallowOffsetOfMaxTimestamp) {
+ this.maxTimestamp = maxTimestamp;
+ this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 09cb80d..0c0fa3c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -16,11 +16,15 @@
*/
package org.apache.kafka.common.record;
-import java.nio.ByteBuffer;
-
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
/**
* A record: a serialized key and value along with the associated CRC and other fields
@@ -53,7 +57,12 @@ public final class Record {
/**
* The amount of overhead bytes in a record
*/
- public static final int RECORD_OVERHEAD = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+ public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+ /**
+ * The amount of overhead bytes in a record
+ */
+ public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
/**
* The "magic" values
@@ -80,11 +89,6 @@ public final class Record {
public static final int TIMESTAMP_TYPE_ATTRIBUTE_OFFSET = 3;
/**
- * Compression code for uncompressed records
- */
- public static final int NO_COMPRESSION = 0;
-
- /**
* Timestamp value for records without a timestamp
*/
public static final long NO_TIMESTAMP = -1L;
@@ -94,155 +98,20 @@ public final class Record {
private final TimestampType wrapperRecordTimestampType;
public Record(ByteBuffer buffer) {
- this.buffer = buffer;
- this.wrapperRecordTimestamp = null;
- this.wrapperRecordTimestampType = null;
+ this(buffer, null, null);
}
- // Package private constructor for inner iteration.
- Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
+ public Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
this.buffer = buffer;
this.wrapperRecordTimestamp = wrapperRecordTimestamp;
this.wrapperRecordTimestampType = wrapperRecordTimestampType;
}
/**
- * A constructor to create a LogRecord. If the record's compression type is not none, then
- * its value payload should be already compressed with the specified type; the constructor
- * would always write the value payload as is and will not do the compression itself.
- *
- * @param timestamp The timestamp of the record
- * @param key The key of the record (null, if none)
- * @param value The record value
- * @param type The compression type used on the contents of the record (if any)
- * @param valueOffset The offset into the payload array used to extract payload
- * @param valueSize The size of the payload to use
- */
- public Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
- value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
- write(this.buffer, timestamp, key, value, type, valueOffset, valueSize);
- this.buffer.rewind();
- }
-
- public Record(long timestamp, byte[] key, byte[] value, CompressionType type) {
- this(timestamp, key, value, type, 0, -1);
- }
-
- public Record(long timestamp, byte[] value, CompressionType type) {
- this(timestamp, null, value, type);
- }
-
- public Record(long timestamp, byte[] key, byte[] value) {
- this(timestamp, key, value, CompressionType.NONE);
- }
-
- public Record(long timestamp, byte[] value) {
- this(timestamp, null, value, CompressionType.NONE);
- }
-
- // Write a record to the buffer, if the record's compression type is none, then
- // its value payload should be already compressed with the specified type
- public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- // construct the compressor with compression type none since this function will not do any
- //compression according to the input type, it will just write the record's payload as is
- Compressor compressor = new Compressor(buffer, CompressionType.NONE);
- try {
- compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
- } finally {
- compressor.close();
- }
- }
-
- public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
- // write crc
- compressor.putInt((int) (crc & 0xffffffffL));
- // write magic value
- compressor.putByte(CURRENT_MAGIC_VALUE);
- // write attributes
- compressor.putByte(attributes);
- // write timestamp
- compressor.putLong(timestamp);
- // write the key
- if (key == null) {
- compressor.putInt(-1);
- } else {
- compressor.putInt(key.length);
- compressor.put(key, 0, key.length);
- }
- // write the value
- if (value == null) {
- compressor.putInt(-1);
- } else {
- int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- compressor.putInt(size);
- compressor.put(value, valueOffset, size);
- }
- }
-
- public static int recordSize(byte[] key, byte[] value) {
- return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
- }
-
- public static int recordSize(int keySize, int valueSize) {
- return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
- }
-
- public ByteBuffer buffer() {
- return this.buffer;
- }
-
- public static byte computeAttributes(CompressionType type) {
- byte attributes = 0;
- if (type.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
- return attributes;
- }
-
- /**
- * Compute the checksum of the record from the record contents
- */
- public static long computeChecksum(ByteBuffer buffer, int position, int size) {
- Crc32 crc = new Crc32();
- crc.update(buffer.array(), buffer.arrayOffset() + position, size);
- return crc.getValue();
- }
-
- /**
- * Compute the checksum of the record from the attributes, key and value payloads
- */
- public static long computeChecksum(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
- Crc32 crc = new Crc32();
- crc.update(CURRENT_MAGIC_VALUE);
- byte attributes = 0;
- if (type.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
- crc.update(attributes);
- crc.updateLong(timestamp);
- // update for the key
- if (key == null) {
- crc.updateInt(-1);
- } else {
- crc.updateInt(key.length);
- crc.update(key, 0, key.length);
- }
- // update for the value
- if (value == null) {
- crc.updateInt(-1);
- } else {
- int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- crc.updateInt(size);
- crc.update(value, valueOffset, size);
- }
- return crc.getValue();
- }
-
-
- /**
* Compute the checksum of the record from the record contents
*/
public long computeChecksum() {
- return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+ return Utils.computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
}
/**
@@ -256,7 +125,15 @@ public final class Record {
* Returns true if the crc stored with the record matches the crc computed off the record contents
*/
public boolean isValid() {
- return size() >= CRC_LENGTH && checksum() == computeChecksum();
+ return sizeInBytes() >= CRC_LENGTH && checksum() == computeChecksum();
+ }
+
+ public Long wrapperRecordTimestamp() {
+ return wrapperRecordTimestamp;
+ }
+
+ public TimestampType wrapperRecordTimestampType() {
+ return wrapperRecordTimestampType;
}
/**
@@ -264,9 +141,9 @@ public final class Record {
*/
public void ensureValid() {
if (!isValid()) {
- if (size() < CRC_LENGTH)
+ if (sizeInBytes() < CRC_LENGTH)
throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
- + "small, size = " + size() + ")");
+ + "small, size = " + sizeInBytes() + ")");
else
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+ ", computed crc = " + computeChecksum() + ")");
@@ -274,14 +151,17 @@ public final class Record {
}
/**
- * The complete serialized size of this record in bytes (including crc, header attributes, etc)
+ * The complete serialized size of this record in bytes (including crc, header attributes, etc), but
+ * excluding the log overhead (offset and record size).
+ * @return the size in bytes
*/
- public int size() {
+ public int sizeInBytes() {
return buffer.limit();
}
/**
* The length of the key in bytes
+ * @return the size in bytes of the key (0 if the key is null)
*/
public int keySize() {
if (magic() == MAGIC_VALUE_V0)
@@ -292,6 +172,7 @@ public final class Record {
/**
* Does the record have a key?
+ * @return true if so, false otherwise
*/
public boolean hasKey() {
return keySize() >= 0;
@@ -309,13 +190,23 @@ public final class Record {
/**
* The length of the value in bytes
+ * @return the size in bytes of the value (0 if the value is null)
*/
public int valueSize() {
return buffer.getInt(valueSizeOffset());
}
/**
- * The magic version of this record
+ * Check whether the value field of this record is null.
+ * @return true if the value is null, false otherwise
+ */
+ public boolean hasNullValue() {
+ return valueSize() < 0;
+ }
+
+ /**
+ * The magic value (i.e. message format version) of this record
+ * @return the magic value
*/
public byte magic() {
return buffer.get(MAGIC_OFFSET);
@@ -323,6 +214,7 @@ public final class Record {
/**
* The attributes stored with this record
+ * @return the attributes
*/
public byte attributes() {
return buffer.get(ATTRIBUTES_OFFSET);
@@ -333,6 +225,8 @@ public final class Record {
* 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message.
* 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME
* 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME
+ *
+ * @return the timestamp as determined above
*/
public long timestamp() {
if (magic() == MAGIC_VALUE_V0)
@@ -349,6 +243,8 @@ public final class Record {
/**
* The timestamp of the message.
+ * @return the timstamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0 or the message has
+ * been up-converted.
*/
public TimestampType timestampType() {
if (magic() == 0)
@@ -366,36 +262,30 @@ public final class Record {
/**
* A ByteBuffer containing the value of this record
+ * @return the value or null if the value for this record is null
*/
public ByteBuffer value() {
- return sliceDelimited(valueSizeOffset());
+ return Utils.sizeDelimited(buffer, valueSizeOffset());
}
/**
* A ByteBuffer containing the message key
+ * @return the buffer or null if the key for this record is null
*/
public ByteBuffer key() {
if (magic() == MAGIC_VALUE_V0)
- return sliceDelimited(KEY_SIZE_OFFSET_V0);
+ return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V0);
else
- return sliceDelimited(KEY_SIZE_OFFSET_V1);
+ return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V1);
}
/**
- * Read a size-delimited byte buffer starting at the given offset
+ * Get the underlying buffer backing this record instance.
+ *
+ * @return the buffer
*/
- private ByteBuffer sliceDelimited(int start) {
- int size = buffer.getInt(start);
- if (size < 0) {
- return null;
- } else {
- ByteBuffer b = buffer.duplicate();
- b.position(start + 4);
- b = b.slice();
- b.limit(size);
- b.rewind();
- return b;
- }
+ public ByteBuffer buffer() {
+ return this.buffer;
}
public String toString() {
@@ -434,4 +324,316 @@ public final class Record {
return buffer.hashCode();
}
+ /**
+ * Get the size of this record if converted to the given format.
+ *
+ * @param toMagic The target magic version to convert to
+ * @return The size in bytes after conversion
+ */
+ public int convertedSize(byte toMagic) {
+ return recordSize(toMagic, Math.max(0, keySize()), Math.max(0, valueSize()));
+ }
+
+ /**
+ * Convert this record to another message format.
+ *
+ * @param toMagic The target magic version to convert to
+ * @return A new record instance with a freshly allocated ByteBuffer.
+ */
+ public Record convert(byte toMagic) {
+ if (toMagic == magic())
+ return this;
+
+ ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic));
+ TimestampType timestampType = wrapperRecordTimestampType != null ?
+ wrapperRecordTimestampType : TimestampType.forAttributes(attributes());
+ convertTo(buffer, toMagic, timestamp(), timestampType);
+ buffer.rewind();
+ return new Record(buffer);
+ }
+
+ private void convertTo(ByteBuffer buffer, byte toMagic, long timestamp, TimestampType timestampType) {
+ if (compressionType() != CompressionType.NONE)
+ throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
+
+ write(buffer, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
+ }
+
+ /**
+ * Convert this record to another message format and write the converted data to the provided outputs stream.
+ *
+ * @param out The output stream to write the converted data to
+ * @param toMagic The target magic version for conversion
+ * @param timestamp The timestamp to use in the converted record (for up-conversion)
+ * @param timestampType The timestamp type to use in the converted record (for up-conversion)
+ * @throws IOException for any IO errors writing the converted record.
+ */
+ public void convertTo(DataOutputStream out, byte toMagic, long timestamp, TimestampType timestampType) throws IOException {
+ if (compressionType() != CompressionType.NONE)
+ throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
+
+ write(out, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
+ }
+
+ /**
+ * Create a new record instance. If the record's compression type is not none, then
+ * its value payload should be already compressed with the specified type; the constructor
+ * would always write the value payload as is and will not do the compression itself.
+ *
+ * @param magic The magic value to use
+ * @param timestamp The timestamp of the record
+ * @param key The key of the record (null, if none)
+ * @param value The record value
+ * @param compressionType The compression type used on the contents of the record (if any)
+ * @param timestampType The timestamp type to be used for this record
+ */
+ public static Record create(byte magic,
+ long timestamp,
+ byte[] key,
+ byte[] value,
+ CompressionType compressionType,
+ TimestampType timestampType) {
+ int keySize = key == null ? 0 : key.length;
+ int valueSize = value == null ? 0 : value.length;
+ ByteBuffer buffer = ByteBuffer.allocate(recordSize(magic, keySize, valueSize));
+ write(buffer, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+ buffer.rewind();
+ return new Record(buffer);
+ }
+
+ public static Record create(long timestamp, byte[] key, byte[] value) {
+ return create(CURRENT_MAGIC_VALUE, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+ }
+
+ public static Record create(byte magic, long timestamp, byte[] key, byte[] value) {
+ return create(magic, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+ }
+
+ public static Record create(byte magic, TimestampType timestampType, long timestamp, byte[] key, byte[] value) {
+ return create(magic, timestamp, key, value, CompressionType.NONE, timestampType);
+ }
+
+ public static Record create(byte magic, long timestamp, byte[] value) {
+ return create(magic, timestamp, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+ }
+
+ public static Record create(byte magic, byte[] key, byte[] value) {
+ return create(magic, NO_TIMESTAMP, key, value);
+ }
+
+ public static Record create(byte[] key, byte[] value) {
+ return create(NO_TIMESTAMP, key, value);
+ }
+
+ public static Record create(byte[] value) {
+ return create(CURRENT_MAGIC_VALUE, NO_TIMESTAMP, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+ }
+
+ /**
+ * Write the header for a compressed record set in-place (i.e. assuming the compressed record data has already
+ * been written at the value offset in a wrapped record). This lets you dynamically create a compressed message
+ * set, and then go back later and fill in its size and CRC, which saves the need for copying to another buffer.
+ *
+ * @param buffer The buffer containing the compressed record data positioned at the first offset of the
+ * @param magic The magic value of the record set
+ * @param recordSize The size of the record (including record overhead)
+ * @param timestamp The timestamp of the wrapper record
+ * @param compressionType The compression type used
+ * @param timestampType The timestamp type of the wrapper record
+ */
+ public static void writeCompressedRecordHeader(ByteBuffer buffer,
+ byte magic,
+ int recordSize,
+ long timestamp,
+ CompressionType compressionType,
+ TimestampType timestampType) {
+ int recordPosition = buffer.position();
+ int valueSize = recordSize - recordOverhead(magic);
+
+ // write the record header with a null value (the key is always null for the wrapper)
+ write(buffer, magic, timestamp, null, null, compressionType, timestampType);
+
+ // now fill in the value size
+ buffer.putInt(recordPosition + keyOffset(magic), valueSize);
+
+ // compute and fill the crc from the beginning of the message
+ long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
+ Utils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
+ }
+
+ private static void write(ByteBuffer buffer,
+ byte magic,
+ long timestamp,
+ ByteBuffer key,
+ ByteBuffer value,
+ CompressionType compressionType,
+ TimestampType timestampType) {
+ try {
+ ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+ write(out, magic, timestamp, key, value, compressionType, timestampType);
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * Write the record data with the given compression type and return the computed crc.
+ *
+ * @param out The output stream to write to
+ * @param magic The magic value to be used
+ * @param timestamp The timestamp of the record
+ * @param key The record key
+ * @param value The record value
+ * @param compressionType The compression type
+ * @param timestampType The timestamp type
+ * @return the computed CRC for this record.
+ * @throws IOException for any IO errors writing to the output stream.
+ */
+ public static long write(DataOutputStream out,
+ byte magic,
+ long timestamp,
+ byte[] key,
+ byte[] value,
+ CompressionType compressionType,
+ TimestampType timestampType) throws IOException {
+ return write(out, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+ }
+
+ private static long write(DataOutputStream out,
+ byte magic,
+ long timestamp,
+ ByteBuffer key,
+ ByteBuffer value,
+ CompressionType compressionType,
+ TimestampType timestampType) throws IOException {
+ byte attributes = computeAttributes(magic, compressionType, timestampType);
+ long crc = computeChecksum(magic, attributes, timestamp, key, value);
+ write(out, magic, crc, attributes, timestamp, key, value);
+ return crc;
+ }
+
+
+ /**
+ * Write a record using raw fields (without validation). This should only be used in testing.
+ */
+ public static void write(DataOutputStream out,
+ byte magic,
+ long crc,
+ byte attributes,
+ long timestamp,
+ byte[] key,
+ byte[] value) throws IOException {
+ write(out, magic, crc, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+ }
+
+ // Write a record to the buffer, if the record's compression type is none, then
+ // its value payload should be already compressed with the specified type
+ private static void write(DataOutputStream out,
+ byte magic,
+ long crc,
+ byte attributes,
+ long timestamp,
+ ByteBuffer key,
+ ByteBuffer value) throws IOException {
+ if (magic != MAGIC_VALUE_V0 && magic != MAGIC_VALUE_V1)
+ throw new IllegalArgumentException("Invalid magic value " + magic);
+ if (timestamp < 0 && timestamp != NO_TIMESTAMP)
+ throw new IllegalArgumentException("Invalid message timestamp " + timestamp);
+
+ // write crc
+ out.writeInt((int) (crc & 0xffffffffL));
+ // write magic value
+ out.writeByte(magic);
+ // write attributes
+ out.writeByte(attributes);
+
+ // maybe write timestamp
+ if (magic > 0)
+ out.writeLong(timestamp);
+
+ // write the key
+ if (key == null) {
+ out.writeInt(-1);
+ } else {
+ int size = key.remaining();
+ out.writeInt(size);
+ out.write(key.array(), key.arrayOffset(), size);
+ }
+ // write the value
+ if (value == null) {
+ out.writeInt(-1);
+ } else {
+ int size = value.remaining();
+ out.writeInt(size);
+ out.write(value.array(), value.arrayOffset(), size);
+ }
+ }
+
+ public static int recordSize(byte[] key, byte[] value) {
+ return recordSize(CURRENT_MAGIC_VALUE, key, value);
+ }
+
+ public static int recordSize(byte magic, byte[] key, byte[] value) {
+ return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
+ }
+
+ private static int recordSize(byte magic, int keySize, int valueSize) {
+ return recordOverhead(magic) + keySize + valueSize;
+ }
+
+ // visible only for testing
+ public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) {
+ byte attributes = 0;
+ if (type.id > 0)
+ attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+ if (magic > 0)
+ return timestampType.updateAttributes(attributes);
+ return attributes;
+ }
+
+ // visible only for testing
+ public static long computeChecksum(byte magic, byte attributes, long timestamp, byte[] key, byte[] value) {
+ return computeChecksum(magic, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+ }
+
+ /**
+ * Compute the checksum of the record from the attributes, key and value payloads
+ */
+ private static long computeChecksum(byte magic, byte attributes, long timestamp, ByteBuffer key, ByteBuffer value) {
+ Crc32 crc = new Crc32();
+ crc.update(magic);
+ crc.update(attributes);
+ if (magic > 0)
+ crc.updateLong(timestamp);
+ // update for the key
+ if (key == null) {
+ crc.updateInt(-1);
+ } else {
+ int size = key.remaining();
+ crc.updateInt(size);
+ crc.update(key.array(), key.arrayOffset(), size);
+ }
+ // update for the value
+ if (value == null) {
+ crc.updateInt(-1);
+ } else {
+ int size = value.remaining();
+ crc.updateInt(size);
+ crc.update(value.array(), value.arrayOffset(), size);
+ }
+ return crc.getValue();
+ }
+
+ public static int recordOverhead(byte magic) {
+ if (magic == 0)
+ return RECORD_OVERHEAD_V0;
+ return RECORD_OVERHEAD_V1;
+ }
+
+ private static int keyOffset(byte magic) {
+ if (magic == 0)
+ return KEY_OFFSET_V0;
+ return KEY_OFFSET_V1;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 3bc043f..823d2b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,32 +18,74 @@ package org.apache.kafka.common.record;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
/**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
+ * Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
+ * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
+ * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
+ * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
+ * over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note
+ * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
+ * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
+ * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
*/
-public interface Records extends Iterable<LogEntry> {
+public interface Records {
- int SIZE_LENGTH = 4;
+ int OFFSET_OFFSET = 0;
int OFFSET_LENGTH = 8;
- int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
+ int SIZE_OFFSET = OFFSET_OFFSET + OFFSET_LENGTH;
+ int SIZE_LENGTH = 4;
+ int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH;
/**
- * The size of these records in bytes
- * @return The size in bytes
+ * The size of these records in bytes.
+ * @return The size in bytes of the records
*/
int sizeInBytes();
/**
- * Write the messages in this set to the given channel starting at the given offset byte.
+ * Write the contents of this buffer to a channel.
* @param channel The channel to write to
- * @param position The position within this record set to begin writing from
+ * @param position The position in the buffer to write from
* @param length The number of bytes to write
- * @return The number of bytes written to the channel (which may be fewer than requested)
- * @throws IOException For any IO errors copying the
+ * @return The number of bytes written
+ * @throws IOException For any IO errors
*/
long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
+ /**
+ * Get the shallow log entries in this log buffer. Note that the signature allows subclasses
+ * to return a more specific log entry type. This enables optimizations such as in-place offset
+ * assignment (see {@link ByteBufferLogInputStream.ByteBufferLogEntry}), and partial reading of
+ * record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
+ * @return An iterator over the shallow entries of the log
+ */
+ Iterator<? extends LogEntry> shallowIterator();
+
+ /**
+ * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
+ * there are fewer options for optimization since the data must be decompressed before it can be
+ * returned. Hence there is little advantage in allowing subclasses to return a more specific type
+ * as we do for {@link #shallowIterator()}.
+ * @return An iterator over the deep entries of the log
+ */
+ Iterator<LogEntry> deepIterator();
+
+ /**
+ * Check whether all shallow entries in this buffer have a certain magic value.
+ * @param magic The magic value to check
+ * @return true if all shallow entries have a matching magic value, false otherwise
+ */
+ boolean hasMatchingShallowMagic(byte magic);
+
+
+ /**
+ * Convert all entries in this buffer to the format passed as a parameter. Note that this requires
+ * deep iteration since all of the deep records must also be converted to the desired format.
+ * @param toMagic The magic value to convert to
+ * @return A Records (which may or may not be the same instance)
+ */
+ Records toMessageFormat(byte toMagic);
}