You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:44 UTC
[13/13] git commit: Rename client package from kafka.* to
org.apache.kafka.*
Rename client package from kafka.* to org.apache.kafka.*
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa6339c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa6339c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa6339c1
Branch: refs/heads/trunk
Commit: fa6339c19cd06880d32ec9a5ee6b66e7f1488dcf
Parents: 3220af1
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Feb 6 16:25:11 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Feb 6 16:25:11 2014 -0800
----------------------------------------------------------------------
.../producer/BufferExhaustedException.java | 17 -
.../java/kafka/clients/producer/Callback.java | 18 -
.../kafka/clients/producer/KafkaProducer.java | 286 ---
.../kafka/clients/producer/MockProducer.java | 199 --
.../java/kafka/clients/producer/Producer.java | 48 -
.../kafka/clients/producer/ProducerConfig.java | 130 --
.../kafka/clients/producer/ProducerRecord.java | 84 -
.../kafka/clients/producer/RecordMetadata.java | 39 -
.../clients/producer/internals/BufferPool.java | 223 --
.../internals/FutureRecordMetadata.java | 63 -
.../clients/producer/internals/Metadata.java | 120 -
.../clients/producer/internals/Partitioner.java | 55 -
.../internals/ProduceRequestResult.java | 81 -
.../producer/internals/RecordAccumulator.java | 234 --
.../clients/producer/internals/RecordBatch.java | 86 -
.../clients/producer/internals/Sender.java | 504 ----
.../clients/tools/ProducerPerformance.java | 65 -
clients/src/main/java/kafka/common/Cluster.java | 123 -
.../main/java/kafka/common/Configurable.java | 15 -
.../main/java/kafka/common/KafkaException.java | 26 -
clients/src/main/java/kafka/common/Metric.java | 23 -
clients/src/main/java/kafka/common/Node.java | 76 -
.../main/java/kafka/common/PartitionInfo.java | 58 -
.../main/java/kafka/common/TopicPartition.java | 61 -
.../kafka/common/config/AbstractConfig.java | 93 -
.../java/kafka/common/config/ConfigDef.java | 253 --
.../kafka/common/config/ConfigException.java | 24 -
.../java/kafka/common/errors/ApiException.java | 35 -
.../common/errors/CorruptRecordException.java | 23 -
.../errors/LeaderNotAvailableException.java | 19 -
.../kafka/common/errors/NetworkException.java | 23 -
.../errors/NotLeaderForPartitionException.java | 23 -
.../common/errors/OffsetMetadataTooLarge.java | 22 -
.../errors/OffsetOutOfRangeException.java | 22 -
.../common/errors/RecordTooLargeException.java | 23 -
.../kafka/common/errors/RetryableException.java | 31 -
.../kafka/common/errors/TimeoutException.java | 23 -
.../common/errors/UnknownServerException.java | 22 -
.../UnknownTopicOrPartitionException.java | 22 -
.../java/kafka/common/metrics/CompoundStat.java | 40 -
.../java/kafka/common/metrics/JmxReporter.java | 184 --
.../java/kafka/common/metrics/KafkaMetric.java | 55 -
.../java/kafka/common/metrics/Measurable.java | 16 -
.../kafka/common/metrics/MeasurableStat.java | 10 -
.../java/kafka/common/metrics/MetricConfig.java | 71 -
.../main/java/kafka/common/metrics/Metrics.java | 190 --
.../kafka/common/metrics/MetricsReporter.java | 27 -
.../main/java/kafka/common/metrics/Quota.java | 36 -
.../common/metrics/QuotaViolationException.java | 16 -
.../main/java/kafka/common/metrics/Sensor.java | 171 --
.../main/java/kafka/common/metrics/Stat.java | 16 -
.../java/kafka/common/metrics/stats/Avg.java | 33 -
.../java/kafka/common/metrics/stats/Count.java | 29 -
.../kafka/common/metrics/stats/Histogram.java | 141 --
.../java/kafka/common/metrics/stats/Max.java | 29 -
.../java/kafka/common/metrics/stats/Min.java | 29 -
.../kafka/common/metrics/stats/Percentile.java | 32 -
.../kafka/common/metrics/stats/Percentiles.java | 104 -
.../java/kafka/common/metrics/stats/Rate.java | 85 -
.../kafka/common/metrics/stats/SampledStat.java | 110 -
.../java/kafka/common/metrics/stats/Total.java | 31 -
.../kafka/common/network/ByteBufferReceive.java | 45 -
.../kafka/common/network/ByteBufferSend.java | 54 -
.../kafka/common/network/NetworkReceive.java | 74 -
.../java/kafka/common/network/NetworkSend.java | 26 -
.../main/java/kafka/common/network/Receive.java | 35 -
.../java/kafka/common/network/Selectable.java | 68 -
.../java/kafka/common/network/Selector.java | 349 ---
.../main/java/kafka/common/network/Send.java | 41 -
.../java/kafka/common/protocol/ApiKeys.java | 35 -
.../main/java/kafka/common/protocol/Errors.java | 97 -
.../java/kafka/common/protocol/ProtoUtils.java | 97 -
.../java/kafka/common/protocol/Protocol.java | 130 --
.../kafka/common/protocol/types/ArrayOf.java | 63 -
.../java/kafka/common/protocol/types/Field.java | 48 -
.../kafka/common/protocol/types/Schema.java | 134 --
.../common/protocol/types/SchemaException.java | 13 -
.../kafka/common/protocol/types/Struct.java | 227 --
.../java/kafka/common/protocol/types/Type.java | 216 --
.../kafka/common/record/CompressionType.java | 40 -
.../common/record/InvalidRecordException.java | 11 -
.../main/java/kafka/common/record/LogEntry.java | 28 -
.../java/kafka/common/record/MemoryRecords.java | 102 -
.../main/java/kafka/common/record/Record.java | 286 ---
.../main/java/kafka/common/record/Records.java | 29 -
.../kafka/common/requests/RequestHeader.java | 68 -
.../java/kafka/common/requests/RequestSend.java | 38 -
.../kafka/common/requests/ResponseHeader.java | 45 -
.../kafka/common/utils/AbstractIterator.java | 72 -
.../java/kafka/common/utils/CopyOnWriteMap.java | 130 --
.../src/main/java/kafka/common/utils/Crc32.java | 2169 ------------------
.../java/kafka/common/utils/KafkaThread.java | 18 -
.../java/kafka/common/utils/SystemTime.java | 26 -
.../src/main/java/kafka/common/utils/Time.java | 23 -
.../src/main/java/kafka/common/utils/Utils.java | 230 --
.../producer/BufferExhaustedException.java | 17 +
.../apache/kafka/clients/producer/Callback.java | 18 +
.../kafka/clients/producer/KafkaProducer.java | 287 +++
.../kafka/clients/producer/MockProducer.java | 200 ++
.../apache/kafka/clients/producer/Producer.java | 49 +
.../kafka/clients/producer/ProducerConfig.java | 131 ++
.../kafka/clients/producer/ProducerRecord.java | 84 +
.../kafka/clients/producer/RecordMetadata.java | 39 +
.../clients/producer/internals/BufferPool.java | 224 ++
.../internals/FutureRecordMetadata.java | 64 +
.../clients/producer/internals/Metadata.java | 121 +
.../clients/producer/internals/Partitioner.java | 56 +
.../internals/ProduceRequestResult.java | 82 +
.../producer/internals/RecordAccumulator.java | 235 ++
.../clients/producer/internals/RecordBatch.java | 87 +
.../clients/producer/internals/Sender.java | 505 ++++
.../clients/tools/ProducerPerformance.java | 66 +
.../java/org/apache/kafka/common/Cluster.java | 124 +
.../org/apache/kafka/common/Configurable.java | 15 +
.../org/apache/kafka/common/KafkaException.java | 26 +
.../java/org/apache/kafka/common/Metric.java | 23 +
.../main/java/org/apache/kafka/common/Node.java | 76 +
.../org/apache/kafka/common/PartitionInfo.java | 58 +
.../org/apache/kafka/common/TopicPartition.java | 61 +
.../kafka/common/config/AbstractConfig.java | 94 +
.../apache/kafka/common/config/ConfigDef.java | 253 ++
.../kafka/common/config/ConfigException.java | 24 +
.../kafka/common/errors/ApiException.java | 35 +
.../common/errors/CorruptRecordException.java | 23 +
.../errors/LeaderNotAvailableException.java | 19 +
.../kafka/common/errors/NetworkException.java | 23 +
.../errors/NotLeaderForPartitionException.java | 23 +
.../common/errors/OffsetMetadataTooLarge.java | 22 +
.../errors/OffsetOutOfRangeException.java | 22 +
.../common/errors/RecordTooLargeException.java | 23 +
.../kafka/common/errors/RetryableException.java | 31 +
.../kafka/common/errors/TimeoutException.java | 23 +
.../common/errors/UnknownServerException.java | 22 +
.../UnknownTopicOrPartitionException.java | 22 +
.../kafka/common/metrics/CompoundStat.java | 40 +
.../kafka/common/metrics/JmxReporter.java | 185 ++
.../kafka/common/metrics/KafkaMetric.java | 55 +
.../apache/kafka/common/metrics/Measurable.java | 16 +
.../kafka/common/metrics/MeasurableStat.java | 10 +
.../kafka/common/metrics/MetricConfig.java | 71 +
.../apache/kafka/common/metrics/Metrics.java | 191 ++
.../kafka/common/metrics/MetricsReporter.java | 27 +
.../org/apache/kafka/common/metrics/Quota.java | 36 +
.../common/metrics/QuotaViolationException.java | 16 +
.../org/apache/kafka/common/metrics/Sensor.java | 172 ++
.../org/apache/kafka/common/metrics/Stat.java | 16 +
.../apache/kafka/common/metrics/stats/Avg.java | 34 +
.../kafka/common/metrics/stats/Count.java | 30 +
.../kafka/common/metrics/stats/Histogram.java | 141 ++
.../apache/kafka/common/metrics/stats/Max.java | 30 +
.../apache/kafka/common/metrics/stats/Min.java | 30 +
.../kafka/common/metrics/stats/Percentile.java | 32 +
.../kafka/common/metrics/stats/Percentiles.java | 105 +
.../apache/kafka/common/metrics/stats/Rate.java | 90 +
.../kafka/common/metrics/stats/SampledStat.java | 111 +
.../kafka/common/metrics/stats/Total.java | 31 +
.../kafka/common/network/ByteBufferReceive.java | 45 +
.../kafka/common/network/ByteBufferSend.java | 54 +
.../kafka/common/network/NetworkReceive.java | 74 +
.../kafka/common/network/NetworkSend.java | 26 +
.../apache/kafka/common/network/Receive.java | 35 +
.../apache/kafka/common/network/Selectable.java | 68 +
.../apache/kafka/common/network/Selector.java | 350 +++
.../org/apache/kafka/common/network/Send.java | 41 +
.../apache/kafka/common/protocol/ApiKeys.java | 35 +
.../apache/kafka/common/protocol/Errors.java | 98 +
.../kafka/common/protocol/ProtoUtils.java | 98 +
.../apache/kafka/common/protocol/Protocol.java | 131 ++
.../kafka/common/protocol/types/ArrayOf.java | 63 +
.../kafka/common/protocol/types/Field.java | 48 +
.../kafka/common/protocol/types/Schema.java | 134 ++
.../common/protocol/types/SchemaException.java | 13 +
.../kafka/common/protocol/types/Struct.java | 227 ++
.../kafka/common/protocol/types/Type.java | 217 ++
.../kafka/common/record/CompressionType.java | 40 +
.../common/record/InvalidRecordException.java | 11 +
.../apache/kafka/common/record/LogEntry.java | 28 +
.../kafka/common/record/MemoryRecords.java | 103 +
.../org/apache/kafka/common/record/Record.java | 287 +++
.../org/apache/kafka/common/record/Records.java | 29 +
.../kafka/common/requests/RequestHeader.java | 69 +
.../kafka/common/requests/RequestSend.java | 39 +
.../kafka/common/requests/ResponseHeader.java | 46 +
.../kafka/common/utils/AbstractIterator.java | 72 +
.../kafka/common/utils/CopyOnWriteMap.java | 130 ++
.../org/apache/kafka/common/utils/Crc32.java | 2169 ++++++++++++++++++
.../apache/kafka/common/utils/KafkaThread.java | 18 +
.../apache/kafka/common/utils/SystemTime.java | 26 +
.../org/apache/kafka/common/utils/Time.java | 23 +
.../org/apache/kafka/common/utils/Utils.java | 231 ++
.../clients/common/network/SelectorTest.java | 292 ---
.../kafka/clients/producer/BufferPoolTest.java | 170 --
.../kafka/clients/producer/MetadataTest.java | 49 -
.../clients/producer/MockProducerTest.java | 63 -
.../kafka/clients/producer/PartitionerTest.java | 54 -
.../clients/producer/RecordAccumulatorTest.java | 135 --
.../kafka/clients/producer/RecordSendTest.java | 78 -
.../java/kafka/clients/producer/SenderTest.java | 87 -
.../java/kafka/common/config/ConfigDefTest.java | 88 -
.../kafka/common/metrics/JmxReporterTest.java | 21 -
.../java/kafka/common/metrics/MetricsTest.java | 210 --
.../common/metrics/stats/HistogramTest.java | 86 -
.../types/ProtocolSerializationTest.java | 96 -
.../kafka/common/record/MemoryRecordsTest.java | 44 -
.../java/kafka/common/record/RecordTest.java | 87 -
.../common/utils/AbstractIteratorTest.java | 54 -
.../test/java/kafka/common/utils/MockTime.java | 28 -
.../src/test/java/kafka/test/MetricsBench.java | 38 -
.../test/java/kafka/test/Microbenchmarks.java | 143 --
.../src/test/java/kafka/test/MockSelector.java | 87 -
clients/src/test/java/kafka/test/TestUtils.java | 95 -
.../kafka/clients/producer/BufferPoolTest.java | 171 ++
.../kafka/clients/producer/MetadataTest.java | 49 +
.../clients/producer/MockProducerTest.java | 66 +
.../kafka/clients/producer/PartitionerTest.java | 55 +
.../clients/producer/RecordAccumulatorTest.java | 135 ++
.../kafka/clients/producer/RecordSendTest.java | 78 +
.../kafka/clients/producer/SenderTest.java | 88 +
.../kafka/common/config/ConfigDefTest.java | 90 +
.../kafka/common/metrics/JmxReporterTest.java | 24 +
.../kafka/common/metrics/MetricsTest.java | 218 ++
.../common/metrics/stats/HistogramTest.java | 87 +
.../kafka/common/network/SelectorTest.java | 292 +++
.../types/ProtocolSerializationTest.java | 102 +
.../kafka/common/record/MemoryRecordsTest.java | 47 +
.../apache/kafka/common/record/RecordTest.java | 90 +
.../common/utils/AbstractIteratorTest.java | 55 +
.../org/apache/kafka/common/utils/MockTime.java | 30 +
.../org/apache/kafka/test/MetricsBench.java | 39 +
.../org/apache/kafka/test/Microbenchmarks.java | 144 ++
.../org/apache/kafka/test/MockSelector.java | 88 +
.../java/org/apache/kafka/test/TestUtils.java | 96 +
232 files changed, 11898 insertions(+), 11819 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
deleted file mode 100644
index d1aa4b6..0000000
--- a/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package kafka.clients.producer;
-
-import kafka.common.KafkaException;
-
-/**
- * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
- * which data can be sent for long enough for the alloted buffer to be exhausted.
- */
-public class BufferExhaustedException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public BufferExhaustedException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java
deleted file mode 100644
index d287d78..0000000
--- a/clients/src/main/java/kafka/clients/producer/Callback.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.clients.producer;
-
-/**
- * A callback interface that the user can implement to allow code to execute when the request is complete. This callback
- * will generally execute in the background I/O thread so it should be fast.
- */
-public interface Callback {
-
- /**
- * A callback method the user can implement to provide asynchronous handling of request completion. This method will
- * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
- * non-null.
- * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
- * occurred.
- * @param exception The exception thrown during processing of this record. Null if no error occurred.
- */
- public void onCompletion(RecordMetadata metadata, Exception exception);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
deleted file mode 100644
index 1dd63fc..0000000
--- a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
+++ /dev/null
@@ -1,286 +0,0 @@
-package kafka.clients.producer;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import kafka.clients.producer.internals.FutureRecordMetadata;
-import kafka.clients.producer.internals.Metadata;
-import kafka.clients.producer.internals.Partitioner;
-import kafka.clients.producer.internals.RecordAccumulator;
-import kafka.clients.producer.internals.Sender;
-import kafka.common.Cluster;
-import kafka.common.KafkaException;
-import kafka.common.Metric;
-import kafka.common.PartitionInfo;
-import kafka.common.TopicPartition;
-import kafka.common.config.ConfigException;
-import kafka.common.errors.RecordTooLargeException;
-import kafka.common.metrics.JmxReporter;
-import kafka.common.metrics.MetricConfig;
-import kafka.common.metrics.Metrics;
-import kafka.common.metrics.MetricsReporter;
-import kafka.common.network.Selector;
-import kafka.common.record.CompressionType;
-import kafka.common.record.Record;
-import kafka.common.record.Records;
-import kafka.common.utils.KafkaThread;
-import kafka.common.utils.SystemTime;
-
-/**
- * A Kafka client that publishes records to the Kafka cluster.
- * <P>
- * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
- * <p>
- * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these resources.
- */
-public class KafkaProducer implements Producer {
-
- private final Partitioner partitioner;
- private final int maxRequestSize;
- private final long metadataFetchTimeoutMs;
- private final long totalMemorySize;
- private final Metadata metadata;
- private final RecordAccumulator accumulator;
- private final Sender sender;
- private final Metrics metrics;
- private final Thread ioThread;
-
- /**
- * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
- * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
- * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
- * string "42" or the integer 42).
- */
- public KafkaProducer(Map<String, Object> configs) {
- this(new ProducerConfig(configs));
- }
-
- /**
- * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
- * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
- */
- public KafkaProducer(Properties properties) {
- this(new ProducerConfig(properties));
- }
-
- private KafkaProducer(ProducerConfig config) {
- this.metrics = new Metrics(new MetricConfig(),
- Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
- new SystemTime());
- this.partitioner = new Partitioner();
- this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- this.metadata = new Metadata();
- this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
- this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
- this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
- this.totalMemorySize,
- config.getLong(ProducerConfig.LINGER_MS_CONFIG),
- config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL),
- metrics,
- new SystemTime());
- List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis());
- this.sender = new Sender(new Selector(),
- this.metadata,
- this.accumulator,
- config.getString(ProducerConfig.CLIENT_ID_CONFIG),
- config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
- config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
- new SystemTime());
- this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
- this.ioThread.start();
- }
-
- private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
- List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
- for (String url : urls) {
- if (url != null && url.length() > 0) {
- String[] pieces = url.split(":");
- if (pieces.length != 2)
- throw new ConfigException("Invalid url in metadata.broker.list: " + url);
- try {
- InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
- if (address.isUnresolved())
- throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
- addresses.add(address);
- } catch (NumberFormatException e) {
- throw new ConfigException("Invalid port in metadata.broker.list: " + url);
- }
- }
- }
- if (addresses.size() < 1)
- throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
- return addresses;
- }
-
- /**
- * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
- */
- @Override
- public Future<RecordMetadata> send(ProducerRecord record) {
- return send(record, null);
- }
-
- /**
- * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
- * <p>
- * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
- * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
- * response after each one.
- * <p>
- * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
- * it was assigned.
- * <p>
- * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
- * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
- * get()} on this future will result in the metadata for the record or throw any exception that occurred while
- * sending the record.
- * <p>
- * If you want to simulate a simple blocking call you can do the following:
- *
- * <pre>
- * producer.send(new ProducerRecord("the-topic", "key, "value")).get();
- * </pre>
- * <p>
- * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
- * will be invoked when the request is complete.
- *
- * <pre>
- * ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
- * producer.send(myRecord,
- * new Callback() {
- * public void onCompletion(RecordMetadata metadata, Exception e) {
- * if(e != null)
- * e.printStackTrace();
- * System.out.println("The offset of the record we just sent is: " + metadata.offset());
- * }
- * });
- * </pre>
- *
- * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
- * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
- *
- * <pre>
- * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
- * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
- * </pre>
- * <p>
- * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
- * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
- * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
- * to parallelize processing.
- * <p>
- * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
- * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
- * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
- * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
- * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
- * producer to instead throw an exception when buffer memory is exhausted.
- *
- * @param record The record to send
- * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
- * indicates no callback)
- */
- @Override
- public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
- try {
- Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
- int partition = partitioner.partition(record, cluster);
- ensureValidSize(record.key(), record.value());
- TopicPartition tp = new TopicPartition(record.topic(), partition);
- FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
- this.sender.wakeup();
- return future;
- } catch (Exception e) {
- if (callback != null)
- callback.onCompletion(null, e);
- return new FutureFailure(e);
- }
- }
-
- /**
- * Check that this key-value pair will have a serialized size small enough
- */
- private void ensureValidSize(byte[] key, byte[] value) {
- int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- if (serializedSize > this.maxRequestSize)
- throw new RecordTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the maximum request size you have configured with the "
- + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
- + " configuration.");
- if (serializedSize > this.totalMemorySize)
- throw new RecordTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the total memory buffer you have configured with the "
- + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
- + " configuration.");
- }
-
- public List<PartitionInfo> partitionsFor(String topic) {
- return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
- }
-
- @Override
- public Map<String, ? extends Metric> metrics() {
- return Collections.unmodifiableMap(this.metrics.metrics());
- }
-
- /**
- * Close this producer. This method blocks until all in-flight requests complete.
- */
- @Override
- public void close() {
- this.sender.initiateClose();
- try {
- this.ioThread.join();
- } catch (InterruptedException e) {
- throw new KafkaException(e);
- }
- this.metrics.close();
- }
-
- private static class FutureFailure implements Future<RecordMetadata> {
-
- private final ExecutionException exception;
-
- public FutureFailure(Exception exception) {
- this.exception = new ExecutionException(exception);
- }
-
- @Override
- public boolean cancel(boolean interrupt) {
- return false;
- }
-
- @Override
- public RecordMetadata get() throws ExecutionException {
- throw this.exception;
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
- throw this.exception;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java
deleted file mode 100644
index ab83d5f..0000000
--- a/clients/src/main/java/kafka/clients/producer/MockProducer.java
+++ /dev/null
@@ -1,199 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import kafka.clients.producer.internals.FutureRecordMetadata;
-import kafka.clients.producer.internals.Partitioner;
-import kafka.clients.producer.internals.ProduceRequestResult;
-import kafka.common.Cluster;
-import kafka.common.Metric;
-import kafka.common.PartitionInfo;
-import kafka.common.TopicPartition;
-
-/**
- * A mock of the producer interface you can use for testing code that uses Kafka.
- * <p>
- * By default this mock will synchronously complete each send call successfully. However it can be configured to allow
- * the user to control the completion of the call and supply an optional error for the producer to throw.
- */
-public class MockProducer implements Producer {
-
- private final Cluster cluster;
- private final Partitioner partitioner = new Partitioner();
- private final List<ProducerRecord> sent;
- private final Deque<Completion> completions;
- private boolean autoComplete;
- private Map<TopicPartition, Long> offsets;
-
- /**
- * Create a mock producer
- *
- * @param cluster The cluster holding metadata for this producer
- * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
- * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
- * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
- * java.util.concurrent.Future Future<RecordMetadata>} that is returned.
- */
- public MockProducer(Cluster cluster, boolean autoComplete) {
- this.cluster = cluster;
- this.autoComplete = autoComplete;
- this.offsets = new HashMap<TopicPartition, Long>();
- this.sent = new ArrayList<ProducerRecord>();
- this.completions = new ArrayDeque<Completion>();
- }
-
- /**
- * Create a new mock producer with invented metadata the given autoComplete setting.
- *
- * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
- */
- public MockProducer(boolean autoComplete) {
- this(Cluster.empty(), autoComplete);
- }
-
- /**
- * Create a new auto completing mock producer
- *
- * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
- */
- public MockProducer() {
- this(true);
- }
-
- /**
- * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
- *
- * @see #history()
- */
- @Override
- public synchronized Future<RecordMetadata> send(ProducerRecord record) {
- return send(record, null);
- }
-
- /**
- * Adds the record to the list of sent records.
- *
- * @see #history()
- */
- @Override
- public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
- int partition = 0;
- if (this.cluster.partitionsFor(record.topic()) != null)
- partition = partitioner.partition(record, this.cluster);
- ProduceRequestResult result = new ProduceRequestResult();
- FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
- TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
- long offset = nextOffset(topicPartition);
- Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
- this.sent.add(record);
- if (autoComplete)
- completion.complete(null);
- else
- this.completions.addLast(completion);
- return future;
- }
-
- /**
- * Get the next offset for this topic/partition
- */
- private long nextOffset(TopicPartition tp) {
- Long offset = this.offsets.get(tp);
- if (offset == null) {
- this.offsets.put(tp, 1L);
- return 0L;
- } else {
- Long next = offset + 1;
- this.offsets.put(tp, next);
- return offset;
- }
- }
-
- public List<PartitionInfo> partitionsFor(String topic) {
- return this.cluster.partitionsFor(topic);
- }
-
- public Map<String, Metric> metrics() {
- return Collections.emptyMap();
- }
-
- @Override
- public void close() {
- }
-
- /**
- * Get the list of sent records since the last call to {@link #clear()}
- */
- public synchronized List<ProducerRecord> history() {
- return new ArrayList<ProducerRecord>(this.sent);
- }
-
- /**
- * Clear the stored history of sent records
- */
- public synchronized void clear() {
- this.sent.clear();
- this.completions.clear();
- }
-
- /**
- * Complete the earliest uncompleted call successfully.
- *
- * @return true if there was an uncompleted call to complete
- */
- public synchronized boolean completeNext() {
- return errorNext(null);
- }
-
- /**
- * Complete the earliest uncompleted call with the given error.
- *
- * @return true if there was an uncompleted call to complete
- */
- public synchronized boolean errorNext(RuntimeException e) {
- Completion completion = this.completions.pollFirst();
- if (completion != null) {
- completion.complete(e);
- return true;
- } else {
- return false;
- }
- }
-
- private static class Completion {
- private final long offset;
- private final RecordMetadata metadata;
- private final ProduceRequestResult result;
- private final Callback callback;
- private final TopicPartition topicPartition;
-
- public Completion(TopicPartition topicPartition,
- long offset,
- RecordMetadata metadata,
- ProduceRequestResult result,
- Callback callback) {
- this.metadata = metadata;
- this.offset = offset;
- this.result = result;
- this.callback = callback;
- this.topicPartition = topicPartition;
- }
-
- public void complete(RuntimeException e) {
- result.done(topicPartition, e == null ? offset : -1L, e);
- if (callback != null) {
- if (e == null)
- callback.onCompletion(metadata, null);
- else
- callback.onCompletion(null, e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java
deleted file mode 100644
index f149f3a..0000000
--- a/clients/src/main/java/kafka/clients/producer/Producer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package kafka.clients.producer;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import kafka.common.Metric;
-import kafka.common.PartitionInfo;
-
-/**
- * The interface for the {@link KafkaProducer}
- *
- * @see KafkaProducer
- * @see MockProducer
- */
-public interface Producer extends Closeable {
-
- /**
- * Send the given record asynchronously and return a future which will eventually contain the response information.
- *
- * @param record The record to send
- * @return A future which will eventually contain the response information
- */
- public Future<RecordMetadata> send(ProducerRecord record);
-
- /**
- * Send a record and invoke the given callback when the record has been acknowledged by the server
- */
- public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
-
- /**
- * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
- * over time so this list should not be cached.
- */
- public List<PartitionInfo> partitionsFor(String topic);
-
- /**
- * Return a map of metrics maintained by the producer
- */
- public Map<String, ? extends Metric> metrics();
-
- /**
- * Close this producer
- */
- public void close();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
deleted file mode 100644
index a94afc7..0000000
--- a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package kafka.clients.producer;
-
-import static kafka.common.config.ConfigDef.Range.atLeast;
-import static kafka.common.config.ConfigDef.Range.between;
-
-import java.util.Map;
-
-import kafka.common.config.AbstractConfig;
-import kafka.common.config.ConfigDef;
-import kafka.common.config.ConfigDef.Type;
-
-/**
- * The producer configuration keys
- */
-public class ProducerConfig extends AbstractConfig {
-
- private static final ConfigDef config;
-
- /**
- * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
- * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
- * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
- * may want more than one, though, in case a server is down).
- */
- public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
-
- /**
- * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
- * topic.
- */
- public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
-
- /**
- * The buffer size allocated for a partition. When records are received which are smaller than this size the
- * producer will attempt to optimistically group them together until this size is reached.
- */
- public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
-
- /**
- * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
- * faster than they can be delivered to the server the producer will either block or throw an exception based on the
- * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
- */
- public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
-
- /**
- * The number of acknowledgments the producer requires from the server before considering a request complete.
- */
- public static final String REQUIRED_ACKS_CONFIG = "request.required.acks";
-
- /**
- * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment
- * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be
- * returned.
- */
- public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
-
- /**
- * The producer groups together any records that arrive in between request sends. Normally this occurs only under
- * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
- * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
- * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
- * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
- * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
- * This setting defaults to 0.
- */
- public static final String LINGER_MS_CONFIG = "linger.ms";
-
- /**
- * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
- * partitions or other settings will by taken up by producers without restart.
- */
- public static final String METADATA_REFRESH_MS_CONFIG = "topic.metadata.refresh.interval.ms";
-
- /**
- * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
- * of requests beyond just ip/port by allowing a logical application name to be included.
- */
- public static final String CLIENT_ID_CONFIG = "client.id";
-
- /**
- * The size of the TCP send buffer to use when sending data
- */
- public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
-
- /**
- * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
- * has its own cap on record size which may be different from this.
- */
- public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
-
- /**
- * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a
- * host in a tight loop.
- */
- public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
-
- /**
- * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
- * this setting is true and we block, however users who want to guarantee we never block can turn this into an
- * error.
- */
- public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
-
- public static final String ENABLE_JMX = "enable.jmx";
-
- static {
- /* TODO: add docs */
- config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
- .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
- .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
- .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
- /* TODO: should be a string to handle acks=in-sync */
- .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
- .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
- .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
- .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
- .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
- .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
- .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
- .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
- .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah")
- .define(ENABLE_JMX, Type.BOOLEAN, true, "");
- }
-
- ProducerConfig(Map<? extends Object, ? extends Object> props) {
- super(config, props);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
deleted file mode 100644
index b9c20bc..0000000
--- a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package kafka.clients.producer;
-
-/**
- * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
- * partition number, and an optional key and value.
- * <p>
- * If a valid partition number is specified that partition will be used when sending the record. If no partition is
- * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
- * present a partition will be assigned in a round-robin fashion.
- */
-public final class ProducerRecord {
-
- private final String topic;
- private final Integer partition;
- private final byte[] key;
- private final byte[] value;
-
- /**
- * Creates a record to be sent to a specified topic and partition
- *
- * @param topic The topic the record will be appended to
- * @param partition The partition to which the record should be sent
- * @param key The key that will be included in the record
- * @param value The record contents
- */
- public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
- if (topic == null)
- throw new IllegalArgumentException("Topic cannot be null");
- this.topic = topic;
- this.partition = partition;
- this.key = key;
- this.value = value;
- }
-
- /**
- * Create a record to be sent to Kafka
- *
- * @param topic The topic the record will be appended to
- * @param key The key that will be included in the record
- * @param value The record contents
- */
- public ProducerRecord(String topic, byte[] key, byte[] value) {
- this(topic, null, key, value);
- }
-
- /**
- * Create a record with no key
- *
- * @param topic The topic this record should be sent to
- * @param value The record contents
- */
- public ProducerRecord(String topic, byte[] value) {
- this(topic, null, value);
- }
-
- /**
- * The topic this record is being sent to
- */
- public String topic() {
- return topic;
- }
-
- /**
- * The key (or null if no key is specified)
- */
- public byte[] key() {
- return key;
- }
-
- /**
- * @return The value
- */
- public byte[] value() {
- return value;
- }
-
- /**
- * The partition to which the record will be sent (or null if no partition was specified)
- */
- public Integer partition() {
- return partition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
deleted file mode 100644
index 1486586..0000000
--- a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package kafka.clients.producer;
-
-import kafka.common.TopicPartition;
-
-/**
- * The metadata for a record that has been acknowledged by the server
- */
-public final class RecordMetadata {
-
- private final long offset;
- private final TopicPartition topicPartition;
-
- public RecordMetadata(TopicPartition topicPartition, long offset) {
- super();
- this.offset = offset;
- this.topicPartition = topicPartition;
- }
-
- /**
- * The offset of the record in the topic/partition.
- */
- public long offset() {
- return this.offset;
- }
-
- /**
- * The topic the record was appended to
- */
- public String topic() {
- return this.topicPartition.topic();
- }
-
- /**
- * The partition the record was sent to
- */
- public int partition() {
- return this.topicPartition.partition();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
deleted file mode 100644
index c222ca0..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import kafka.clients.producer.BufferExhaustedException;
-
-/**
- * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
- * particular it has the following properties:
- * <ol>
- * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
- * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
- * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
- * buffers are deallocated.
- * </ol>
- */
-public final class BufferPool {
-
- private final long totalMemory;
- private final int poolableSize;
- private final boolean blockOnExhaustion;
- private final ReentrantLock lock;
- private final Deque<ByteBuffer> free;
- private final Deque<Condition> waiters;
- private long availableMemory;
-
- /**
- * Create a new buffer pool
- *
- * @param memory The maximum amount of memory that this buffer pool can allocate
- * @param poolableSize The buffer size to cache in the free list rather than deallocating
- * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
- * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
- * {@link #allocate(int)} will throw an exception if the buffer is out of memory.
- */
- public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion) {
- this.poolableSize = poolableSize;
- this.blockOnExhaustion = blockOnExhaustion;
- this.lock = new ReentrantLock();
- this.free = new ArrayDeque<ByteBuffer>();
- this.waiters = new ArrayDeque<Condition>();
- this.totalMemory = memory;
- this.availableMemory = memory;
- }
-
- /**
- * Allocate a buffer of the given size
- *
- * @param size The buffer size to allocate in bytes
- * @return The buffer
- * @throws InterruptedException If the thread is interrupted while blocked
- * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block
- * forever)
- * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
- */
- public ByteBuffer allocate(int size) throws InterruptedException {
- if (size > this.totalMemory)
- throw new IllegalArgumentException("Attempt to allocate " + size
- + " bytes, but there is a hard limit of "
- + this.totalMemory
- + " on memory allocations.");
-
- this.lock.lock();
- try {
- // check if we have a free buffer of the right size pooled
- if (size == poolableSize && !this.free.isEmpty())
- return this.free.pollFirst();
-
- // now check if the request is immediately satisfiable with the
- // memory on hand or if we need to block
- int freeListSize = this.free.size() * this.poolableSize;
- if (this.availableMemory + freeListSize >= size) {
- // we have enough unallocated or pooled memory to immediately
- // satisfy the request
- freeUp(size);
- this.availableMemory -= size;
- lock.unlock();
- return ByteBuffer.allocate(size);
- } else if (!blockOnExhaustion) {
- throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
- + " bytes of memory you configured for the client and the client is configured to error"
- + " rather than block when memory is exhausted.");
- } else {
- // we are out of memory and will have to block
- int accumulated = 0;
- ByteBuffer buffer = null;
- Condition moreMemory = this.lock.newCondition();
- this.waiters.addLast(moreMemory);
- // loop over and over until we have a buffer or have reserved
- // enough memory to allocate one
- while (accumulated < size) {
- moreMemory.await();
- // check if we can satisfy this request from the free list,
- // otherwise allocate memory
- if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
- // just grab a buffer from the free list
- buffer = this.free.pollFirst();
- accumulated = size;
- } else {
- // we'll need to allocate memory, but we may only get
- // part of what we need on this iteration
- freeUp(size - accumulated);
- int got = (int) Math.min(size - accumulated, this.availableMemory);
- this.availableMemory -= got;
- accumulated += got;
- }
- }
-
- // remove the condition for this thread to let the next thread
- // in line start getting memory
- Condition removed = this.waiters.removeFirst();
- if (removed != moreMemory)
- throw new IllegalStateException("Wrong condition: this shouldn't happen.");
-
- // signal any additional waiters if there is more memory left
- // over for them
- if (this.availableMemory > 0 || !this.free.isEmpty()) {
- if (!this.waiters.isEmpty())
- this.waiters.peekFirst().signal();
- }
-
- // unlock and return the buffer
- lock.unlock();
- if (buffer == null)
- return ByteBuffer.allocate(size);
- else
- return buffer;
- }
- } finally {
- if (lock.isHeldByCurrentThread())
- lock.unlock();
- }
- }
-
- /**
- * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
- * buffers (if needed)
- */
- private void freeUp(int size) {
- while (!this.free.isEmpty() && this.availableMemory < size)
- this.availableMemory += this.free.pollLast().capacity();
- }
-
- /**
- * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
- * memory as free.
- *
- * @param buffers The buffers to return
- */
- public void deallocate(ByteBuffer... buffers) {
- lock.lock();
- try {
- for (int i = 0; i < buffers.length; i++) {
- int size = buffers[i].capacity();
- if (size == this.poolableSize) {
- buffers[i].clear();
- this.free.add(buffers[i]);
- } else {
- this.availableMemory += size;
- }
- Condition moreMem = this.waiters.peekFirst();
- if (moreMem != null)
- moreMem.signal();
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * the total free memory both unallocated and in the free list
- */
- public long availableMemory() {
- lock.lock();
- try {
- return this.availableMemory + this.free.size() * this.poolableSize;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Get the unallocated memory (not in the free list or in use)
- */
- public long unallocatedMemory() {
- lock.lock();
- try {
- return this.availableMemory;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * The number of threads blocked waiting on memory
- */
- public int queued() {
- lock.lock();
- try {
- return this.waiters.size();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * The buffer size that will be retained in the free list after use
- */
- public int poolableSize() {
- return this.poolableSize;
- }
-
- /**
- * The total memory managed by this pool
- */
- public long totalMemory() {
- return this.totalMemory;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
deleted file mode 100644
index 43b4c5d..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import kafka.clients.producer.RecordMetadata;
-
-/**
- * The future result of a record send
- */
-public final class FutureRecordMetadata implements Future<RecordMetadata> {
-
- private final ProduceRequestResult result;
- private final long relativeOffset;
-
- public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
- this.result = result;
- this.relativeOffset = relativeOffset;
- }
-
- @Override
- public boolean cancel(boolean interrupt) {
- return false;
- }
-
- @Override
- public RecordMetadata get() throws InterruptedException, ExecutionException {
- this.result.await();
- return valueOrError();
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- boolean occurred = this.result.await(timeout, unit);
- if (!occurred)
- throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
- return valueOrError();
- }
-
- private RecordMetadata valueOrError() throws ExecutionException {
- if (this.result.error() != null)
- throw new ExecutionException(this.result.error());
- else
- return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
- }
-
- public long relativeOffset() {
- return this.relativeOffset;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return this.result.completed();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
deleted file mode 100644
index f5f8b9b..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import kafka.common.Cluster;
-import kafka.common.PartitionInfo;
-import kafka.common.errors.TimeoutException;
-
-/**
- * A class encapsulating some of the logic around metadata.
- * <p>
- * This class is shared by the client thread (for partitioning) and the background sender thread.
- *
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a
- * topic we don't have any metadata for it will trigger a metadata update.
- */
-public final class Metadata {
-
- private final long refreshBackoffMs;
- private final long metadataExpireMs;
- private long lastRefresh;
- private Cluster cluster;
- private boolean forceUpdate;
- private final Set<String> topics;
-
- /**
- * Create a metadata instance with reasonable defaults
- */
- public Metadata() {
- this(100L, 60 * 60 * 1000L);
- }
-
- /**
- * Create a new Metadata instance
- * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
- * polling
- * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
- */
- public Metadata(long refreshBackoffMs, long metadataExpireMs) {
- this.refreshBackoffMs = refreshBackoffMs;
- this.metadataExpireMs = metadataExpireMs;
- this.lastRefresh = 0L;
- this.cluster = Cluster.empty();
- this.forceUpdate = false;
- this.topics = new HashSet<String>();
- }
-
- /**
- * Get the current cluster info without blocking
- */
- public synchronized Cluster fetch() {
- return this.cluster;
- }
-
- /**
- * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
- * block waiting for an update.
- * @param topic The topic we want metadata for
- * @param maxWaitMs The maximum amount of time to block waiting for metadata
- */
- public synchronized Cluster fetch(String topic, long maxWaitMs) {
- List<PartitionInfo> partitions = null;
- do {
- partitions = cluster.partitionsFor(topic);
- if (partitions == null) {
- long begin = System.currentTimeMillis();
- topics.add(topic);
- forceUpdate = true;
- try {
- wait(maxWaitMs);
- } catch (InterruptedException e) { /* this is fine, just try again */
- }
- long ellapsed = System.currentTimeMillis() - begin;
- if (ellapsed > maxWaitMs)
- throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
- } else {
- return cluster;
- }
- } while (true);
- }
-
- /**
- * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs
- * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
- * than metadataExpireMs has passed since the last refresh)
- */
- public synchronized boolean needsUpdate(long now) {
- long msSinceLastUpdate = now - this.lastRefresh;
- boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
- boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
- return updateAllowed && updateNeeded;
- }
-
- /**
- * Force an update of the current cluster info
- */
- public synchronized void forceUpdate() {
- this.forceUpdate = true;
- }
-
- /**
- * Get the list of topics we are currently maintaining metadata for
- */
- public synchronized Set<String> topics() {
- return new HashSet<String>(this.topics);
- }
-
- /**
- * Update the cluster metadata
- */
- public synchronized void update(Cluster cluster, long now) {
- this.forceUpdate = false;
- this.lastRefresh = now;
- this.cluster = cluster;
- notifyAll();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
deleted file mode 100644
index 6d2188e..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.clients.producer.ProducerRecord;
-import kafka.common.Cluster;
-import kafka.common.PartitionInfo;
-import kafka.common.utils.Utils;
-
-/**
- * The default partitioning strategy:
- * <ul>
- * <li>If a partition is specified in the record, use it
- * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
- * <li>If no partition or key is present choose a partition in a round-robin fashion
- */
-public class Partitioner {
-
- private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
- /**
- * Compute the partition for the given record.
- *
- * @param record The record being sent
- * @param numPartitions The total number of partitions for the given topic
- */
- public int partition(ProducerRecord record, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
- int numPartitions = partitions.size();
- if (record.partition() != null) {
- // they have given us a partition, use it
- if (record.partition() < 0 || record.partition() >= numPartitions)
- throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
- + " is not in the range [0..."
- + numPartitions
- + "].");
- return record.partition();
- } else if (record.key() == null) {
- // choose the next available node in a round-robin fashion
- for (int i = 0; i < numPartitions; i++) {
- int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
- if (partitions.get(partition).leader() != null)
- return partition;
- }
- // no partitions are available, give a non-available partition
- return Utils.abs(counter.getAndIncrement()) % numPartitions;
- } else {
- // hash the key to choose a partition
- return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
deleted file mode 100644
index cdae00a..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import kafka.clients.producer.RecordMetadata;
-import kafka.common.TopicPartition;
-
-/**
- * A class that models the future completion of a produce request for a single partition. There is one of these per
- * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
- * for the same partition in the request.
- */
-public final class ProduceRequestResult {
-
- private final CountDownLatch latch = new CountDownLatch(1);
- private volatile TopicPartition topicPartition;
- private volatile long baseOffset = -1L;
- private volatile RuntimeException error;
-
- public ProduceRequestResult() {
- }
-
- /**
- * Mark this request as complete and unblock any threads waiting on its completion.
- * @param topicPartition The topic and partition to which this record set was sent was sent
- * @param baseOffset The base offset assigned to the record
- * @param error The error that occurred if there was one, or null.
- */
- public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
- this.topicPartition = topicPartition;
- this.baseOffset = baseOffset;
- this.error = error;
- this.latch.countDown();
- }
-
- /**
- * Await the completion of this request
- */
- public void await() throws InterruptedException {
- latch.await();
- }
-
- /**
- * Await the completion of this request (up to the given time interval)
- * @param timeout The maximum time to wait
- * @param unit The unit for the max time
- * @return true if the request completed, false if we timed out
- */
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return latch.await(timeout, unit);
- }
-
- /**
- * The base offset for the request (the first offset in the record set)
- */
- public long baseOffset() {
- return baseOffset;
- }
-
- /**
- * The error thrown (generally on the server) while processing this request
- */
- public RuntimeException error() {
- return error;
- }
-
- /**
- * The topic and partition to which the record was appended
- */
- public TopicPartition topicPartition() {
- return topicPartition;
- }
-
- /**
- * Has the request completed?
- */
- public boolean completed() {
- return this.latch.getCount() == 0L;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
deleted file mode 100644
index c22939f..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
+++ /dev/null
@@ -1,234 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import kafka.clients.producer.Callback;
-import kafka.common.TopicPartition;
-import kafka.common.metrics.Measurable;
-import kafka.common.metrics.MetricConfig;
-import kafka.common.metrics.Metrics;
-import kafka.common.record.CompressionType;
-import kafka.common.record.MemoryRecords;
-import kafka.common.record.Record;
-import kafka.common.record.Records;
-import kafka.common.utils.CopyOnWriteMap;
-import kafka.common.utils.Time;
-import kafka.common.utils.Utils;
-
-/**
- * This class acts as a queue that accumulates records into {@link kafka.common.record.MemoryRecords} instances to be
- * sent to the server.
- * <p>
- * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
- * this behavior is explicitly disabled.
- */
-public final class RecordAccumulator {
-
- private volatile boolean closed;
- private int drainIndex;
- private final int batchSize;
- private final long lingerMs;
- private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
- private final BufferPool free;
- private final Time time;
-
- /**
- * Create a new record accumulator
- *
- * @param batchSize The size to use when allocating {@link kafka.common.record.MemoryRecords} instances
- * @param totalSize The maximum memory the record accumulator can use.
- * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
- * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
- * latency for potentially better throughput due to more batching (and hence fewer, larger requests).
- * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
- * memory
- * @param metrics The metrics
- * @param time The time instance to use
- */
- public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) {
- this.drainIndex = 0;
- this.closed = false;
- this.batchSize = batchSize;
- this.lingerMs = lingerMs;
- this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
- this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
- this.time = time;
- registerMetrics(metrics);
- }
-
- private void registerMetrics(Metrics metrics) {
- metrics.addMetric("blocked_threads",
- "The number of user threads blocked waiting for buffer memory to enqueue their records",
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.queued();
- }
- });
- metrics.addMetric("buffer_total_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.totalMemory();
- }
- });
- metrics.addMetric("buffer_available_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering records).",
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- });
- }
-
- /**
- * Add a record to the accumulator.
- * <p>
- * This method will block if sufficient memory isn't available for the record unless blocking has been disabled.
- *
- * @param tp The topic/partition to which this record is being sent
- * @param key The key for the record
- * @param value The value for the record
- * @param compression The compression codec for the record
- * @param callback The user-supplied callback to execute when the request is complete
- */
- public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
- if (closed)
- throw new IllegalStateException("Cannot send after the producer is closed.");
- // check if we have an in-progress batch
- Deque<RecordBatch> dq = dequeFor(tp);
- synchronized (dq) {
- RecordBatch batch = dq.peekLast();
- if (batch != null) {
- FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
- if (future != null)
- return future;
- }
- }
-
- // we don't have an in-progress record batch try to allocate a new batch
- int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
- ByteBuffer buffer = free.allocate(size);
- synchronized (dq) {
- RecordBatch first = dq.peekLast();
- if (first != null) {
- FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
- if (future != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
- // often...
- free.deallocate(buffer);
- return future;
- }
- }
- RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
- dq.addLast(batch);
- return future;
- }
- }
-
- /**
- * Get a list of topic-partitions which are ready to be sent.
- * <p>
- * A partition is ready if ANY of the following are true:
- * <ol>
- * <li>The record set is full
- * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
- * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are
- * immediately considered ready).
- * <li>The accumulator has been closed
- * </ol>
- */
- public List<TopicPartition> ready(long now) {
- List<TopicPartition> ready = new ArrayList<TopicPartition>();
- boolean exhausted = this.free.queued() > 0;
- for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
- Deque<RecordBatch> deque = entry.getValue();
- synchronized (deque) {
- RecordBatch batch = deque.peekFirst();
- if (batch != null) {
- boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
- boolean expired = now - batch.created >= lingerMs;
- if (full | expired | exhausted | closed)
- ready.add(batch.topicPartition);
- }
- }
- }
- return ready;
- }
-
- /**
- * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
- * to avoid choosing the same topic-partitions over and over.
- *
- * @param partitions The list of partitions to drain
- * @param maxSize The maximum number of bytes to drain
- * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
- * TODO: There may be a starvation issue due to iteration order
- */
- public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize) {
- if (partitions.isEmpty())
- return Collections.emptyList();
- int size = 0;
- List<RecordBatch> ready = new ArrayList<RecordBatch>();
- /* to make starvation less likely this loop doesn't start at 0 */
- int start = drainIndex = drainIndex % partitions.size();
- do {
- TopicPartition tp = partitions.get(drainIndex);
- Deque<RecordBatch> deque = dequeFor(tp);
- if (deque != null) {
- synchronized (deque) {
- if (size + deque.peekFirst().records.sizeInBytes() > maxSize) {
- return ready;
- } else {
- RecordBatch batch = deque.pollFirst();
- size += batch.records.sizeInBytes();
- ready.add(batch);
- }
- }
- }
- this.drainIndex = (this.drainIndex + 1) % partitions.size();
- } while (start != drainIndex);
- return ready;
- }
-
- /**
- * Get the deque for the given topic-partition, creating it if necessary. Since new topics will only be added rarely
- * we copy-on-write the hashmap
- */
- private Deque<RecordBatch> dequeFor(TopicPartition tp) {
- Deque<RecordBatch> d = this.batches.get(tp);
- if (d != null)
- return d;
- this.batches.putIfAbsent(tp, new ArrayDeque<RecordBatch>());
- return this.batches.get(tp);
- }
-
- /**
- * Deallocate the list of record batches
- */
- public void deallocate(Collection<RecordBatch> batches) {
- ByteBuffer[] buffers = new ByteBuffer[batches.size()];
- int i = 0;
- for (RecordBatch batch : batches) {
- buffers[i] = batch.records.buffer();
- i++;
- }
- free.deallocate(buffers);
- }
-
- /**
- * Close this accumulator and force all the record buffers to be drained
- */
- public void close() {
- this.closed = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
deleted file mode 100644
index 633f4af..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordMetadata;
-import kafka.common.TopicPartition;
-import kafka.common.record.CompressionType;
-import kafka.common.record.MemoryRecords;
-
-/**
- * A batch of records that is or will be sent.
- *
- * This class is not thread safe and external synchronization must be used when modifying it
- */
-public final class RecordBatch {
- public int recordCount = 0;
- public final long created;
- public final MemoryRecords records;
- public final TopicPartition topicPartition;
- private final ProduceRequestResult produceFuture;
- private final List<Thunk> thunks;
-
- public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
- this.created = now;
- this.records = records;
- this.topicPartition = tp;
- this.produceFuture = new ProduceRequestResult();
- this.thunks = new ArrayList<Thunk>();
- }
-
- /**
- * Append the record to the current record set and return the relative offset within that record set
- *
- * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
- */
- public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
- if (!this.records.hasRoomFor(key, value)) {
- return null;
- } else {
- this.records.append(0L, key, value, compression);
- FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
- if (callback != null)
- thunks.add(new Thunk(callback, this.recordCount));
- this.recordCount++;
- return future;
- }
- }
-
- /**
- * Complete the request
- *
- * @param offset The offset
- * @param errorCode The error code or 0 if no error
- */
- public void done(long offset, RuntimeException exception) {
- this.produceFuture.done(topicPartition, offset, exception);
- // execute callbacks
- for (int i = 0; i < this.thunks.size(); i++) {
- try {
- Thunk thunk = this.thunks.get(i);
- if (exception == null)
- thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
- null);
- else
- thunk.callback.onCompletion(null, exception);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * A callback and the associated RecordSend argument to pass to it.
- */
- final private static class Thunk {
- final Callback callback;
- final long relativeOffset;
-
- public Thunk(Callback callback, long relativeOffset) {
- this.callback = callback;
- this.relativeOffset = relativeOffset;
- }
- }
-}
\ No newline at end of file