You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/27 05:49:32 UTC
git commit: KAFKA-1253 Compression in the new producer;
reviewed by Jay Kreps and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 455c490f6 -> 466a83b78
KAFKA-1253 Compression in the new producer; reviewed by Jay Kreps and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/466a83b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/466a83b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/466a83b7
Branch: refs/heads/trunk
Commit: 466a83b78c2bfcb9ac3116748394e7845a99bf7a
Parents: 455c490
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Mar 26 21:48:55 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 26 21:49:06 2014 -0700
----------------------------------------------------------------------
build.gradle | 3 +-
.../kafka/clients/producer/KafkaProducer.java | 4 +-
.../kafka/clients/producer/ProducerConfig.java | 10 +-
.../clients/producer/internals/BufferPool.java | 31 ++--
.../producer/internals/RecordAccumulator.java | 43 +++--
.../clients/producer/internals/RecordBatch.java | 7 +-
.../clients/tools/ProducerPerformance.java | 6 +-
.../kafka/common/record/CompressionType.java | 7 +-
.../kafka/common/record/MemoryRecords.java | 168 +++++++++++++++----
.../org/apache/kafka/common/record/Record.java | 122 +++++++++-----
.../org/apache/kafka/common/utils/Crc32.java | 36 ++++
.../org/apache/kafka/common/utils/Utils.java | 24 ---
.../kafka/common/record/MemoryRecordsTest.java | 32 +++-
.../apache/kafka/common/record/RecordTest.java | 13 +-
.../java/org/apache/kafka/test/TestUtils.java | 2 +-
.../scala/kafka/producer/ConsoleProducer.scala | 4 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 6 +-
.../kafka/api/ProducerSendTest.scala | 26 ++-
.../scala/kafka/perf/ProducerPerformance.scala | 1 +
19 files changed, 367 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d6fd287..5432c0c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -151,7 +151,6 @@ project(':core') {
compile 'com.101tec:zkclient:0.3'
compile 'com.yammer.metrics:metrics-core:2.2.0'
compile 'net.sf.jopt-simple:jopt-simple:3.2'
- compile 'org.xerial.snappy:snappy-java:1.0.5'
testCompile 'junit:junit:4.1'
testCompile 'org.easymock:easymock:3.0'
@@ -317,6 +316,8 @@ project(':clients') {
dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
+ compile 'org.xerial.snappy:snappy-java:1.0.5'
+
testCompile 'com.novocode:junit-interface:0.9'
testRuntime "$slf4jlog4j"
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ac6943..1ff9174 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -69,6 +69,7 @@ public class KafkaProducer implements Producer {
private final Sender sender;
private final Metrics metrics;
private final Thread ioThread;
+ private final CompressionType compressionType;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -99,6 +100,7 @@ public class KafkaProducer implements Producer {
config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+ this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
this.totalMemorySize,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
@@ -224,7 +226,7 @@ public class KafkaProducer implements Producer {
ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
- FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
+ FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
this.sender.wakeup();
return future;
// For API exceptions return them in the future;
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 32e12ad..48706ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig {
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
/**
+ * The compression type for all data generated. The default is none (i.e. no compression)
+ */
+ public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+
+ /**
* Should we register the Kafka metrics as JMX mbeans?
*/
public static final String ENABLE_JMX_CONFIG = "enable.jmx";
@@ -158,9 +163,10 @@ public class ProducerConfig extends AbstractConfig {
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
- .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
- .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah");
+ .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
+ .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
+ .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "");
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index b69866a..d1d6c4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -71,7 +71,7 @@ public final class BufferPool {
* @param size The buffer size to allocate in bytes
* @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked
- * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block
+ * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever)
* @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
*/
@@ -167,28 +167,31 @@ public final class BufferPool {
* Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
* memory as free.
*
- * @param buffers The buffers to return
+ * @param buffer The buffer to return
+ * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
+ * since the buffer may re-allocate itself during in-place compression
*/
- public void deallocate(ByteBuffer... buffers) {
+ public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
- for (int i = 0; i < buffers.length; i++) {
- int size = buffers[i].capacity();
- if (size == this.poolableSize) {
- buffers[i].clear();
- this.free.add(buffers[i]);
- } else {
- this.availableMemory += size;
- }
- Condition moreMem = this.waiters.peekFirst();
- if (moreMem != null)
- moreMem.signal();
+ if (size == this.poolableSize && size == buffer.capacity()) {
+ buffer.clear();
+ this.free.add(buffer);
+ } else {
+ this.availableMemory += size;
}
+ Condition moreMem = this.waiters.peekFirst();
+ if (moreMem != null)
+ moreMem.signal();
} finally {
lock.unlock();
}
}
+ public void deallocate(ByteBuffer buffer) {
+ deallocate(buffer, buffer.capacity());
+ }
+
/**
* the total free memory both unallocated and in the free list
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 673b296..50bf95f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -91,26 +91,26 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics) {
metrics.addMetric("blocked_threads",
- "The number of user threads blocked waiting for buffer memory to enqueue their records",
- new Measurable() {
+ "The number of user threads blocked waiting for buffer memory to enqueue their records",
+ new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
});
metrics.addMetric("buffer_total_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
+ new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
});
metrics.addMetric("buffer_available_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- });
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.availableMemory();
+ }
+ });
}
/**
@@ -132,7 +132,7 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch batch = dq.peekLast();
if (batch != null) {
- FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+ FutureRecordMetadata future = batch.tryAppend(key, value, callback);
if (future != null)
return future;
}
@@ -145,7 +145,7 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch last = dq.peekLast();
if (last != null) {
- FutureRecordMetadata future = last.tryAppend(key, value, compression, callback);
+ FutureRecordMetadata future = last.tryAppend(key, value, callback);
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
// often...
@@ -153,8 +153,10 @@ public final class RecordAccumulator {
return future;
}
}
- RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression);
+ RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+
dq.addLast(batch);
return future;
}
@@ -193,7 +195,7 @@ public final class RecordAccumulator {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
- boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
+ boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = now - batch.created >= lingerMs;
boolean sendable = full || expired || exhausted || closed;
if (sendable && !backingOff)
@@ -239,10 +241,15 @@ public final class RecordAccumulator {
Deque<RecordBatch> deque = dequeFor(tp);
if (deque != null) {
synchronized (deque) {
- if (size + deque.peekFirst().records.sizeInBytes() > maxSize) {
+ RecordBatch first = deque.peekFirst();
+ if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+ // there is a rare case that a single batch size is larger than the request size due
+ // to compression; in this case we will still eventually send this batch in a single
+ // request
return ready;
} else {
RecordBatch batch = deque.pollFirst();
+ batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
}
@@ -269,7 +276,7 @@ public final class RecordAccumulator {
* Deallocate the record batch
*/
public void deallocate(RecordBatch batch) {
- free.deallocate(batch.records.buffer());
+ free.deallocate(batch.records.buffer(), batch.records.capacity());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 038a05a..35f1d7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -17,7 +17,6 @@ import java.util.List;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,11 +53,11 @@ public final class RecordBatch {
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
- public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
- this.records.append(0L, key, value, compression);
+ this.records.append(0L, key, value);
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, future));
@@ -71,7 +70,7 @@ public final class RecordBatch {
* Complete the request
*
* @param baseOffset The base offset of the messages assigned by the server
- * @param errorCode The error code or 0 if no error
+ * @param exception The exception returned or null if no exception
*/
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 3ebbb80..05085e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -29,8 +29,8 @@ import org.apache.kafka.common.record.Records;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
- if (args.length != 5) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks");
+ if (args.length < 5) {
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]");
System.exit(1);
}
String url = args[0];
@@ -45,6 +45,8 @@ public class ProducerPerformance {
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
+ if (args.length == 6)
+ props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
KafkaProducer producer = new KafkaProducer(props);
Callback callback = new Callback() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 906da02..c557e44 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,14 +20,16 @@ package org.apache.kafka.common.record;
* The compression type to use
*/
public enum CompressionType {
- NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
+ NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f);
public final int id;
public final String name;
+ public final float rate;
- private CompressionType(int id, String name) {
+ private CompressionType(int id, String name, float rate) {
this.id = id;
this.name = name;
+ this.rate = rate;
}
public static CompressionType forId(int id) {
@@ -53,4 +55,5 @@ public enum CompressionType {
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 9d8935f..428968c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,53 +16,99 @@
*/
package org.apache.kafka.common.record;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.AbstractIterator;
-
/**
* A {@link Records} implementation backed by a ByteBuffer.
*/
public class MemoryRecords implements Records {
- private final ByteBuffer buffer;
+ private final Compressor compressor;
+ private final int capacity;
+ private ByteBuffer buffer;
+ private boolean writable;
+
+ // Construct a writable memory records
+ private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) {
+ this.writable = writable;
+ this.capacity = buffer.capacity();
+ if (this.writable) {
+ this.buffer = null;
+ this.compressor = new Compressor(buffer, type);
+ } else {
+ this.buffer = buffer;
+ this.compressor = null;
+ }
+ }
- public MemoryRecords(int size) {
- this(ByteBuffer.allocate(size));
+ public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
+ return new MemoryRecords(buffer, type, true);
}
- public MemoryRecords(ByteBuffer buffer) {
- this.buffer = buffer;
+ public static MemoryRecords iterableRecords(ByteBuffer buffer) {
+ return new MemoryRecords(buffer, CompressionType.NONE, false);
}
/**
* Append the given record and offset to the buffer
*/
public void append(long offset, Record record) {
- buffer.putLong(offset);
- buffer.putInt(record.size());
- buffer.put(record.buffer());
+ if (!writable)
+ throw new IllegalStateException("Memory records is not writable");
+
+ int size = record.size();
+ compressor.putLong(offset);
+ compressor.putInt(size);
+ compressor.put(record.buffer());
+ compressor.recordWritten(size + Records.LOG_OVERHEAD);
record.buffer().rewind();
}
/**
* Append a new record and offset to the buffer
*/
- public void append(long offset, byte[] key, byte[] value, CompressionType type) {
- buffer.putLong(offset);
- buffer.putInt(Record.recordSize(key, value));
- Record.write(this.buffer, key, value, type);
+ public void append(long offset, byte[] key, byte[] value) {
+ if (!writable)
+ throw new IllegalStateException("Memory records is not writable");
+
+ int size = Record.recordSize(key, value);
+ compressor.putLong(offset);
+ compressor.putInt(size);
+ compressor.putRecord(key, value);
+ compressor.recordWritten(size + Records.LOG_OVERHEAD);
}
/**
* Check if we have room for a new record containing the given key/value pair
+ *
+ * Note that the return value is based on the estimate of the bytes written to the compressor,
+ * which may not be accurate if compression is really used. When this happens, the following
+ * append may cause dynamic buffer re-allocation in the underlying byte buffer stream.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ return this.writable &&
+ this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ }
+
+ public boolean isFull() {
+ return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten();
+ }
+
+ /**
+ * Close this batch for no more appends
+ */
+ public void close() {
+ compressor.close();
+ writable = false;
+ buffer = compressor.buffer();
}
/** Write the records in this set to the given channel */
@@ -74,7 +120,14 @@ public class MemoryRecords implements Records {
* The size of this record set
*/
public int sizeInBytes() {
- return this.buffer.position();
+ return compressor.buffer().position();
+ }
+
+ /**
+ * Return the capacity of the buffer
+ */
+ public int capacity() {
+ return this.capacity;
}
/**
@@ -86,34 +139,79 @@ public class MemoryRecords implements Records {
@Override
public Iterator<LogEntry> iterator() {
- return new RecordsIterator(this.buffer);
+ ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
+ return new RecordsIterator(copy, CompressionType.NONE, false);
}
- /* TODO: allow reuse of the buffer used for iteration */
public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
-
- public RecordsIterator(ByteBuffer buffer) {
- ByteBuffer copy = buffer.duplicate();
- copy.flip();
- this.buffer = copy;
+ private final DataInputStream stream;
+ private final CompressionType type;
+ private final boolean shallow;
+ private RecordsIterator innerIter;
+
+ public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
+ this.type = type;
+ this.buffer = buffer;
+ this.shallow = shallow;
+ stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
}
+ /*
+ * Read the next record from the buffer.
+ *
+ * Note that in the compressed message set, each message value size is set as the size
+ * of the un-compressed version of the message value, so when we do de-compression
+ * allocating an array of the specified size for reading compressed value data is sufficient.
+ */
@Override
protected LogEntry makeNext() {
- if (buffer.remaining() < Records.LOG_OVERHEAD)
- return allDone();
- long offset = buffer.getLong();
- int size = buffer.getInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
- if (buffer.remaining() < size)
- return allDone();
- ByteBuffer rec = buffer.slice();
- rec.limit(size);
- this.buffer.position(this.buffer.position() + size);
- return new LogEntry(offset, new Record(rec));
+ if (innerDone()) {
+ try {
+ // read the offset
+ long offset = stream.readLong();
+ // read record size
+ int size = stream.readInt();
+ if (size < 0)
+ throw new IllegalStateException("Record with size " + size);
+ // read the record, if compression is used we cannot depend on size
+ // and hence has to do extra copy
+ ByteBuffer rec;
+ if (type == CompressionType.NONE) {
+ rec = buffer.slice();
+ buffer.position(buffer.position() + size);
+ rec.limit(size);
+ } else {
+ byte[] recordBuffer = new byte[size];
+ stream.read(recordBuffer, 0, size);
+ rec = ByteBuffer.wrap(recordBuffer);
+ }
+ LogEntry entry = new LogEntry(offset, new Record(rec));
+ entry.record().ensureValid();
+
+ // decide whether to go shallow or deep iteration if it is compressed
+ CompressionType compression = entry.record().compressionType();
+ if (compression == CompressionType.NONE || shallow) {
+ return entry;
+ } else {
+ // init the inner iterator with the value payload of the message,
+ // which will de-compress the payload to a set of messages
+ ByteBuffer value = entry.record().value();
+ innerIter = new RecordsIterator(value, compression, true);
+ return innerIter.next();
+ }
+ } catch (EOFException e) {
+ return allDone();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ } else {
+ return innerIter.next();
+ }
}
- }
+ private boolean innerDone() {
+ return (innerIter == null || !innerIter.hasNext());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index f1dc977..ce1177e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
@@ -40,13 +41,15 @@ public final class Record {
public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
public static final int VALUE_SIZE_LENGTH = 4;
- /** The amount of overhead bytes in a record */
- public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
+ /**
+ * The size for the record header
+ */
+ public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
/**
- * The minimum valid size for the record header
+ * The amount of overhead bytes in a record
*/
- public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+ public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
/**
* The current "magic" value
@@ -71,27 +74,29 @@ public final class Record {
}
/**
- * A constructor to create a LogRecord
+ * A constructor to create a LogRecord. If the record's compression type is not none, then
+ * its value payload should be already compressed with the specified type; the constructor
+ * would always write the value payload as is and will not do the compression itself.
*
* @param key The key of the record (null, if none)
* @param value The record value
- * @param codec The compression codec used on the contents of the record (if any)
+ * @param type The compression type used on the contents of the record (if any)
* @param valueOffset The offset into the payload array used to extract payload
* @param valueSize The size of the payload to use
*/
- public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
- this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
- : value.length - valueOffset)));
- write(this.buffer, key, value, codec, valueOffset, valueSize);
+ public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+ this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
+ value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
+ write(this.buffer, key, value, type, valueOffset, valueSize);
this.buffer.rewind();
}
- public Record(byte[] key, byte[] value, CompressionType codec) {
- this(key, value, codec, 0, -1);
+ public Record(byte[] key, byte[] value, CompressionType type) {
+ this(key, value, type, 0, -1);
}
- public Record(byte[] value, CompressionType codec) {
- this(null, value, codec);
+ public Record(byte[] value, CompressionType type) {
+ this(null, value, type);
}
public Record(byte[] key, byte[] value) {
@@ -102,40 +107,37 @@ public final class Record {
this(null, value, CompressionType.NONE);
}
- public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
- // skip crc, we will fill that in at the end
- int pos = buffer.position();
- buffer.position(pos + MAGIC_OFFSET);
- buffer.put(CURRENT_MAGIC_VALUE);
- byte attributes = 0;
- if (codec.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
- buffer.put(attributes);
+ // Write a record to the buffer, if the record's compression type is none, then
+ // its value payload should be already compressed with the specified type
+ public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+ // construct the compressor with compression type none since this function will not do any
+ //compression according to the input type, it will just write the record's payload as is
+ Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
+ compressor.putRecord(key, value, type, valueOffset, valueSize);
+ }
+
+ public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
+ // write crc
+ compressor.putInt((int) (crc & 0xffffffffL));
+ // write magic value
+ compressor.putByte(CURRENT_MAGIC_VALUE);
+ // write attributes
+ compressor.putByte(attributes);
// write the key
if (key == null) {
- buffer.putInt(-1);
+ compressor.putInt(-1);
} else {
- buffer.putInt(key.length);
- buffer.put(key, 0, key.length);
+ compressor.putInt(key.length);
+ compressor.put(key, 0, key.length);
}
// write the value
if (value == null) {
- buffer.putInt(-1);
+ compressor.putInt(-1);
} else {
int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- buffer.putInt(size);
- buffer.put(value, valueOffset, size);
+ compressor.putInt(size);
+ compressor.put(value, valueOffset, size);
}
-
- // now compute the checksum and fill it in
- long crc = computeChecksum(buffer,
- buffer.arrayOffset() + pos + MAGIC_OFFSET,
- buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
- Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
- }
-
- public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
- write(buffer, key, value, codec, 0, -1);
}
public static int recordSize(byte[] key, byte[] value) {
@@ -150,13 +152,51 @@ public final class Record {
return this.buffer;
}
+ public static byte computeAttributes(CompressionType type) {
+ byte attributes = 0;
+ if (type.id > 0)
+ attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+ return attributes;
+ }
+
/**
* Compute the checksum of the record from the record contents
*/
public static long computeChecksum(ByteBuffer buffer, int position, int size) {
- return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
+ Crc32 crc = new Crc32();
+ crc.update(buffer.array(), buffer.arrayOffset() + position, size);
+ return crc.getValue();
+ }
+
+ /**
+ * Compute the checksum of the record from the attributes, key and value payloads
+ */
+ public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+ Crc32 crc = new Crc32();
+ crc.update(CURRENT_MAGIC_VALUE);
+ byte attributes = 0;
+ if (type.id > 0)
+ attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+ crc.update(attributes);
+ // update for the key
+ if (key == null) {
+ crc.updateInt(-1);
+ } else {
+ crc.updateInt(key.length);
+ crc.update(key, 0, key.length);
+ }
+ // update for the value
+ if (value == null) {
+ crc.updateInt(-1);
+ } else {
+ int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+ crc.updateInt(size);
+ crc.update(value, valueOffset, size);
+ }
+ return crc.getValue();
}
+
/**
* Compute the checksum of the record from the record contents
*/
@@ -239,7 +279,7 @@ public final class Record {
}
/**
- * The compression codec used with this record
+ * The compression type used with this record
*/
public CompressionType compressionType() {
return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index 153c5a6..047ca98 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -28,6 +28,30 @@ import java.util.zip.Checksum;
*/
public class Crc32 implements Checksum {
+ /**
+ * Compute the CRC32 of the byte array
+ *
+ * @param bytes The array to compute the checksum for
+ * @return The CRC32
+ */
+ public static long crc32(byte[] bytes) {
+ return crc32(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Compute the CRC32 of the segment of the byte array given by the specified size and offset
+ *
+ * @param bytes The bytes to checksum
+ * @param offset the offset at which to begin checksumming
+ * @param size the number of bytes to checksum
+ * @return The CRC32
+ */
+ public static long crc32(byte[] bytes, int offset, int size) {
+ Crc32 crc = new Crc32();
+ crc.update(bytes, offset, size);
+ return crc.getValue();
+ }
+
/** the current CRC value, bit-flipped */
private int crc;
@@ -97,6 +121,18 @@ public class Crc32 implements Checksum {
crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
}
+ /**
+ * Update the CRC32 given an integer
+ */
+ final public void updateInt(int input) {
+ update((byte) (input >> 24));
+ update((byte) (input >> 16));
+ update((byte) (input >> 8));
+ update((byte) input /* >> 0 */);
+ }
+
+
+
/*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0c6b365..50af601 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -92,30 +92,6 @@ public class Utils {
}
/**
- * Compute the CRC32 of the byte array
- *
- * @param bytes The array to compute the checksum for
- * @return The CRC32
- */
- public static long crc32(byte[] bytes) {
- return crc32(bytes, 0, bytes.length);
- }
-
- /**
- * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
- *
- * @param bytes The bytes to checksum
- * @param offset the offset at which to begin checksumming
- * @param size the number of bytes to checksum
- * @return The CRC32
- */
- public static long crc32(byte[] bytes, int offset, int size) {
- Crc32 crc = new Crc32();
- crc.update(bytes, offset, size);
- return crc.getValue();
- }
-
- /**
* Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
* java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b0745b5..94a1112 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -22,29 +22,35 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
-import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(value = Parameterized.class)
public class MemoryRecordsTest {
+ private CompressionType compression;
+
+ public MemoryRecordsTest(CompressionType compression) {
+ this.compression = compression;
+ }
+
@Test
public void testIterator() {
- MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024));
- MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024));
+ MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
+ MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
new Record("b".getBytes(), "2".getBytes()),
new Record("c".getBytes(), "3".getBytes()));
for (int i = 0; i < list.size(); i++) {
Record r = list.get(i);
recs1.append(i, r);
- recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType());
+ recs2.append(i, toArray(r.key()), toArray(r.value()));
}
+ recs1.close();
+ recs2.close();
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
@@ -54,10 +60,18 @@ public class MemoryRecordsTest {
LogEntry entry = iter.next();
assertEquals((long) i, entry.offset());
assertEquals(list.get(i), entry.record());
+ entry.record().ensureValid();
}
assertFalse(iter.hasNext());
}
}
}
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<Object[]>();
+ for (CompressionType type: CompressionType.values())
+ values.add(new Object[] { type });
+ return values;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index ae54d67..2765913 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -27,9 +27,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.Record;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -66,6 +63,10 @@ public class RecordTest {
@Test
public void testChecksum() {
assertEquals(record.checksum(), record.computeChecksum());
+ assertEquals(record.checksum(), record.computeChecksum(
+ this.key == null ? null : this.key.array(),
+ this.value == null ? null : this.value.array(),
+ this.compression, 0, -1));
assertTrue(record.isValid());
for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
Record copy = copyOf(record);
@@ -95,9 +96,11 @@ public class RecordTest {
@Parameters
public static Collection<Object[]> data() {
+ byte[] payload = new byte[1000];
+ Arrays.fill(payload, (byte) 1);
List<Object[]> values = new ArrayList<Object[]>();
- for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes()))
- for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes()))
+ for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+ for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
for (CompressionType compression : CompressionType.values())
values.add(new Object[] { key, value, compression });
return values;
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 36cfc0f..76a17e8 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -88,7 +88,7 @@ public class TestUtils {
/**
* Generate an array of random bytes
*
- * @param numBytes The size of the array
+ * @param size The size of the array
*/
public static byte[] randomBytes(int size) {
byte[] bytes = new byte[size];
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index dd39ff2..57386b1 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -255,8 +255,8 @@ object ConsoleProducer {
class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
- val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
- props.put("compression.codec", codec.toString)
+ val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name
+ props.put("compression.type", compression)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index c002f5e..525a060 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -319,7 +319,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
-
val producer = new KafkaProducer(producerProps)
override def doWork(): Unit = {
@@ -335,5 +334,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
case e : Exception => failed = true
}
}
+
+ override def shutdown(){
+ super.shutdown()
+ producer.close
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 66ea76b..3c37330 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,21 +17,20 @@
package kafka.api.test
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
-import kafka.api.FetchRequestBuilder
-import kafka.message.Message
+import java.util.Properties
+import java.lang.{Integer, IllegalArgumentException}
import org.apache.kafka.clients.producer._
-
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import java.util.Properties
-import java.lang.{Integer, IllegalArgumentException}
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequestBuilder
+import kafka.message.Message
class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -76,15 +75,10 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
super.tearDown()
}
- class PrintOffsetCallback extends Callback {
+ class CheckErrorCallback extends Callback {
def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception != null)
fail("Send callback returns the following exception", exception)
- try {
- System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset);
- } catch {
- case e: Throwable => fail("Should succeed sending the message", e)
- }
}
}
@@ -100,7 +94,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
var producer = new KafkaProducer(props)
- val callback = new PrintOffsetCallback
+ val callback = new CheckErrorCallback
try {
// create topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 3df0d13..9e4ebaf 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -212,6 +212,7 @@ object ProducerPerformance extends Logging {
props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("request.retries", config.producerNumRetries.toString)
props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
+ props.put("compression.type", config.compressionCodec.name)
val producer = new KafkaProducer(props)
def send(topic: String, partition: Long, bytes: Array[Byte]) {