You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <bo...@gmail.com> on 2014/01/23 21:54:57 UTC

Review Request 17263: New producer for Kafka.

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1227
    https://issues.apache.org/jira/browse/KAFKA-1227


Repository: kafka


Description
-------

KAFKA-1227 New producer!


Diffs
-----

  clients/build.sbt PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
  clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
  clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
  clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
  clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
  clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
  clients/src/main/java/kafka/common/Node.java PRE-CREATION 
  clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
  clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
  clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
  clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
  clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
  clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
  clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
  clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
  clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
  clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
  clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
  clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
  clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
  clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
  clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
  clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
  clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
  clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
  clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
  clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
  clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
  project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 

Diff: https://reviews.apache.org/r/17263/diff/


Testing
-------


Thanks,

Jay Kreps


Re: Review Request 17263: New producer for Kafka.

Posted by Joel Koshy <jj...@gmail.com>.

> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 182
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line182>
> >
> >     This could add a couple minutes startup for producers that send to several
> >     topics (such as mirror-makers) since metadata lookup (at startup) will be
> >     issued serially for each topic.
> >
> 
> Jay Kreps wrote:
>     Wait, why? Shouldn't metadata fetch be very fast?

Yes - faster than I thought. i.e., I originally thought the send blocks
until a leader is elected for the topic but that does not seem to be the
case. It returns as soon as the topic is created. In any event, the
latencies can add up if you are sending for the first time to a lot of new
topics. I'm not sure this matters though.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 299
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line299>
> >
> >     On retryable errors we should probably force a metadata refresh. This is
> >     separate from retries (which I don't see implemented in this patch). Are
> >     retries going to be in-built or is the client expected to handle retries?
> >     
> >
> 
> Jay Kreps wrote:
>     I don't think we have decided about retries.
>     
>     This is a good point about re-fetching metadata. I don't believe I fully understand when metadata refetch should happen.

if (Errors.forCode(errorCode).exception instanceof RetryableException)
  this.metadata.forceUpdate();

Otherwise, subsequent sends to this partition would remain in error
(unnecessarily for retryable errors) until the next metadata refresh.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 448
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line448>
> >
> >     Why do we need a deque for inflight requests? Given that there can be at
> >     most one outstanding request a reference should have been sufficient - no?
> >
> 
> Jay Kreps wrote:
>     There can be only one request being written at any time but there can be any number of in-flight requests.

Yes, but in processReadyPartitions you filter out (by calling canSendMore) partitions destined for brokers that already have a request in the deque. I may be missing something but I don't see the deque growing > 1 (except that you currently allow a metadata request in as well).


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33033
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Jay Kreps <bo...@gmail.com>.

> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > Overall, this looks great. Here are a few comments - I have some more
> > minor/stylistic comments (typos in comments and broken javadoc, code
> > conventions, minor javadoc edits and such) which I would rather defer for a
> > later more final patch.
> > 
> > I only did a cursory review of Protocol (which I on the surface looks very
> > promising) and did not review the unit tests and metrics package. WRT
> > metrics I'm a bit ambivalent - i.e., my preference would be to resolve that
> > first if you intend to switch to custom metrics or just stick to coda hale.
> > The main benefits I see are full control over how metrics are computed
> > (e.g., coda hale metrics at least in the version we are using makes it
> > difficult/impossible to configure reservoir samples sizes, decay constants,
> > reporting intervals, etc.) and predictable API.
> > 
> > A lot of logging and stats are missing (e.g., request rates, callback
> > execution time, ser-de time, etc.). I'm assuming these will be addressed in
> > your final patch.
> >

Yes, I held off on adding metrics and stats until we agree on how to do that.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 182
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line182>
> >
> >     This could add a couple minutes startup for producers that send to several
> >     topics (such as mirror-makers) since metadata lookup (at startup) will be
> >     issued serially for each topic.
> >

Wait, why? Shouldn't metadata fetch be very fast?


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 186
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line186>
> >
> >     IIRC our current producer allows re-partitioning to an available partition.
> >     We can probably do something similar on producer errors, but arguably
> >     that approach is an artifact of longer than ideal leader election latency.
> >

You can kind of do this. That is you can mimic the 0.7 behavior by having a partitioner that only partitions to available partitions irrespective of key. Since there are no retries yet I think this is effectively as good as it gets.

I am a little averse to the idea of some kind of -1 partition that is handled magically by the producer but that is also an option.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 21
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line21>
> >
> >     My reading of bufferPool - let me know if this is incorrect:
> >     - For most producers, most allocations will be of batch-size. (And most
> >       batches will not entirely fill the batch-size buffer).
> >     - Batch size needs to be properly tuned:
> >       - If too low, then you end up having (typically) one of the following:
> >         - Messages exceed batch size and thus poolable size and so you end up
> >           having to allocate on every record.
> >         - Messages fit but you have more sends than (probably) necessary given
> >           that you are sending smaller requests
> >       - If too high, then you could end up wasting a lot of memory especially
> >         for producers such as the mirror maker which produces to several
> >         partitions.
> >             
> >     It is an interesting approach, and I will think about it more. I would
> >     expect it to work very well for 90% of producers that send to a few topics,
> >     but it may be problematic for large producers such as mirror makers for
> >     which my intuition is that even very simple memory banks (i.e., a few levels
> >     of free lists) would address without fully reimplementing malloc. E.g., for
> >     each free list if we were to maintain counts on percentage of
> >     smaller-than-half allocations we could if necessary create a free list of
> >     smaller poolable sizes and keep a global cap on the number of free lists.
> >

Yeah I originally shared a lot of your concerns, but consider a few additional things:
1. I don't think that for large messages doing an allocation and send per-message is a problem because the sending of bytes will dominate. i.e. I think we could hard code a batch size of 16k and it would be close to optimal for most producers. Basically the goal is to avoid small allocations/sends not to avoid per-message allocations/sends.
2. The memory fragmentation problem is not as bad as it seems. Keep in mind that each java object significant overhead. Keeping around a list of ProducerRecord instances is actually pretty pricy if you sit and add up the actual per message overheads. I think also internally malloc/new must have fragmentation, that is when you ask for 15 bytes you often get more than 15 bytes. I'm not convinced this strategy is that much worse.
3. One optimization that Neha pointed out that would improve things would be to take all ready batches for a partition. I think this could be added later.
4. With 16k batch size and 1000 partitions you only need 16M of buffer space which is not unreasonable so I think this may work even for mirror-maker like cases.

That said if you have some thoughts on how the pooling could work I would be interested to hear it. I considered something like what you describe where I pick a few sizes to pool at, but it definitely adds some complications and corner cases so the challenge is to algorithmically work through some of these.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line150>
> >
> >     Another simple approach is to just attach the last send time to each
> >     topic-partition and sort this in increasing order of last send time although
> >     a more ideal heuristic would be to also take into account the actual data
> >     available.
> >

Yeah this is interesting. What I interpret you to be saying is to sort all the batches by the created timestamp on the record batch which corresponds to the first message enqueue.

The advantage of this is that instead of round-robining in the check always chose the partition which has the oldest data.

Avoiding the sort on each iteration might require some kind of heap.

This would be a good optimization.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java, line 81
> > <https://reviews.apache.org/r/17263/diff/1/?file=436456#file436456line81>
> >
> >     Would it be clearer to a user to have an "error callback" vs. a single
> >     callback? i.e., a user may want different handling for either scenario and
> >     has to "remember" to check hasError if there is only one callback. This
> >     probably folds into the API discussion which I have not yet looked over in
> >     great detail.
> >

The error will be thrown by await() or any accessor method on the send. There was some discussion on this as part of the api discussion...


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 299
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line299>
> >
> >     On retryable errors we should probably force a metadata refresh. This is
> >     separate from retries (which I don't see implemented in this patch). Are
> >     retries going to be in-built or is the client expected to handle retries?
> >     
> >

I don't think we have decided about retries.

This is a good point about re-fetching metadata. I don't believe I fully understand when metadata refetch should happen.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 448
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line448>
> >
> >     Why do we need a deque for inflight requests? Given that there can be at
> >     most one outstanding request a reference should have been sufficient - no?
> >

There can be only one request being written at any time but there can be any number of in-flight requests.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/Node.java, line 41
> > <https://reviews.apache.org/r/17263/diff/1/?file=436465#file436465line41>
> >
> >     Can cache the result (as you do for TopicPartition).
> >

Agreed.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/StringSerialization.java, line 40
> > <https://reviews.apache.org/r/17263/diff/1/?file=436468#file436468line40>
> >
> >     What is the motivation for wrapping encoding exceptions with KafkaException?
> >

It is a checked exception. I guess the question is whether the serializer should be able to throw checked exceptions and then what we should do with those given that send doesn't.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/config/AbstractConfig.java, line 67
> > <https://reviews.apache.org/r/17263/diff/1/?file=436470#file436470line67>
> >
> >     ... is currently unused :) Assuming you will incorporate this in the final
> >     patch.
> >

Yeah...


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/config/ConfigException.java, line 10
> > <https://reviews.apache.org/r/17263/diff/1/?file=436472#file436472line10>
> >
> >     How will these UIDs be used?
> >

Basically Exception is a serializable class so you get a warning if you don't have a serialVersionUID. I sincerely hope no one will serialize our exceptions.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/NetworkReceive.java, line 17
> > <https://reviews.apache.org/r/17263/diff/1/?file=436510#file436510line17>
> >
> >     Seems to be an unnecessary constructor.

I was using that in the unit tests to fake responses.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/Selector.java, line 102
> > <https://reviews.apache.org/r/17263/diff/1/?file=436514#file436514line102>
> >
> >     Shouldn't we check earlier in this method if we have an established
> >     connection?
> >     
> >     Also, the containsKey check should take the id.
> >

Good point.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/Send.java, line 30
> > <https://reviews.apache.org/r/17263/diff/1/?file=436515#file436515line30>
> >
> >     Not sure what the use-case for this would be.

It is sort of a complicated and unfinished thought. Basically if you want to directly carry the ByteBuffers in the request directly through to the network send without recopying you need a way to try to reach around the abstraction. I'll wrap that up and either use and explain that or remove it.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33033
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33033
-----------------------------------------------------------


Overall, this looks great. Here are a few comments - I have some more
minor/stylistic comments (typos in comments and broken javadoc, code
conventions, minor javadoc edits and such) which I would rather defer for a
later more final patch.

I only did a cursory review of Protocol (which I on the surface looks very
promising) and did not review the unit tests and metrics package. WRT
metrics I'm a bit ambivalent - i.e., my preference would be to resolve that
first if you intend to switch to custom metrics or just stick to coda hale.
The main benefits I see are full control over how metrics are computed
(e.g., coda hale metrics at least in the version we are using makes it
difficult/impossible to configure reservoir samples sizes, decay constants,
reporting intervals, etc.) and predictable API.

A lot of logging and stats are missing (e.g., request rates, callback
execution time, ser-de time, etc.). I'm assuming these will be addressed in
your final patch.



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment62223>

    This could add a couple minutes startup for producers that send to several
    topics (such as mirror-makers) since metadata lookup (at startup) will be
    issued serially for each topic.
    



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment62222>

    IIRC our current producer allows re-partitioning to an available partition.
    We can probably do something similar on producer errors, but arguably
    that approach is an artifact of longer than ideal leader election latency.
    



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment62233>

    My reading of bufferPool - let me know if this is incorrect:
    - For most producers, most allocations will be of batch-size. (And most
      batches will not entirely fill the batch-size buffer).
    - Batch size needs to be properly tuned:
      - If too low, then you end up having (typically) one of the following:
        - Messages exceed batch size and thus poolable size and so you end up
          having to allocate on every record.
        - Messages fit but you have more sends than (probably) necessary given
          that you are sending smaller requests
      - If too high, then you could end up wasting a lot of memory especially
        for producers such as the mirror maker which produces to several
        partitions.
            
    It is an interesting approach, and I will think about it more. I would
    expect it to work very well for 90% of producers that send to a few topics,
    but it may be problematic for large producers such as mirror makers for
    which my intuition is that even very simple memory banks (i.e., a few levels
    of free lists) would address without fully reimplementing malloc. E.g., for
    each free list if we were to maintain counts on percentage of
    smaller-than-half allocations we could if necessary create a free list of
    smaller poolable sizes and keep a global cap on the number of free lists.
    



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment62219>

    Another simple approach is to just attach the last send time to each
    topic-partition and sort this in increasing order of last send time although
    a more ideal heuristic would be to also take into account the actual data
    available.
    



clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/17263/#comment62237>

    Would it be clearer to a user to have an "error callback" vs. a single
    callback? i.e., a user may want different handling for either scenario and
    has to "remember" to check hasError if there is only one callback. This
    probably folds into the API discussion which I have not yet looked over in
    great detail.
    



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment62226>

    On retryable errors we should probably force a metadata refresh. This is
    separate from retries (which I don't see implemented in this patch). Are
    retries going to be in-built or is the client expected to handle retries?
    
    



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment62236>

    Why do we need a deque for inflight requests? Given that there can be at
    most one outstanding request a reference should have been sufficient - no?
    



clients/src/main/java/kafka/common/Node.java
<https://reviews.apache.org/r/17263/#comment62194>

    Can cache the result (as you do for TopicPartition).
    



clients/src/main/java/kafka/common/StringSerialization.java
<https://reviews.apache.org/r/17263/#comment62192>

    What is the motivation for wrapping encoding exceptions with KafkaException?
    



clients/src/main/java/kafka/common/config/AbstractConfig.java
<https://reviews.apache.org/r/17263/#comment62197>

    ... is currently unused :) Assuming you will incorporate this in the final
    patch.
    



clients/src/main/java/kafka/common/config/ConfigException.java
<https://reviews.apache.org/r/17263/#comment62205>

    How will these UIDs be used?
    



clients/src/main/java/kafka/common/network/ByteBufferReceive.java
<https://reviews.apache.org/r/17263/#comment62208>

    Unused



clients/src/main/java/kafka/common/network/ByteBufferReceive.java
<https://reviews.apache.org/r/17263/#comment62209>

    remaining needs to be updated



clients/src/main/java/kafka/common/network/NetworkReceive.java
<https://reviews.apache.org/r/17263/#comment62212>

    Seems to be an unnecessary constructor.



clients/src/main/java/kafka/common/network/Selector.java
<https://reviews.apache.org/r/17263/#comment62213>

    Shouldn't we check earlier in this method if we have an established
    connection?
    
    Also, the containsKey check should take the id.
    



clients/src/main/java/kafka/common/network/Send.java
<https://reviews.apache.org/r/17263/#comment62210>

    Not sure what the use-case for this would be.



clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
<https://reviews.apache.org/r/17263/#comment62214>

    Unused class.


- Joel Koshy


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32863
-----------------------------------------------------------



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61868>

    General comment - We are using the batch size to be the pool size. I had done a very similar implementation previously and had found it led to a lot of fragmentation. The main problem is that you dont want to let the pool size be determined by a batch size that can change. If the batch size is increased, a low volume topic partition is going to use up batch size of memory and other requests would stall. I am not proposing we implement malloc but I think we can do better than this. The poolable size should be a fixed value and much smaller than the batch size. The batch size can then be served as a multiple of the poolable size. This ensures that increasing batch size does not affect your allocation pattern and helps you to keep the poolable size much smaller which is important to support the low volume topic. At the least, we should track effective memory used Vs total available memory using metrics. 


- Sriram Subramanian


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32806
-----------------------------------------------------------



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61815>

    Ok makes sense. I am wondering if max memory needs to be hard bound or soft limit and just allocate the buffer for requests that are larger than the pool size and reduce complexity in this code. Not recommending anything specific. 



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61814>

    I think there is a case here where two threads could just be accumulating memory and not reach their require capacity and always be in the loop. I dont think Condition.signal guarantees any order in which threads can be released.


- Sriram Subramanian


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33004
-----------------------------------------------------------


Some more comments regarding the blocking behavior


clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment62435>

    
    This is more of an artifact of the clunky auto create behavior - if messages are sent to a non-existing topic with auto.create turned off on the broker side, a producer will hang for 1 minute to figure out that the messages can never be sent until the topic is created. A cleaner solution to this is to fix the way we we perform admin operations and possibly expose create_topic, delete_topic etc as client APIs.
    Also, since create topic is not transactional, it is possible for clients to see a partial view of the topic when topic creation is in progress. This will affect partitioning since the initial messages might get mis-routed due to an incorrect view of total number of partitions. Both these behaviors seem very unintuitive from a client's perspective. 
    



clients/src/main/java/kafka/clients/producer/internals/Metadata.java
<https://reviews.apache.org/r/17263/#comment62434>

    Blocking waiting for metadata for a new topic seems unexpected behavior for a client that wants true async behavior. One way of handling is to have a per topic queue for partition "-1" that enqueues messages for topics being created. Once the topic's metadata is known, this queue is thrown away. That way even if metadata is not discovered before the queue fills up, the blocking behavior is exercised as configured through the "block.on.buffer.full" config. 
    
    



clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/17263/#comment62131>

    typo -> sufficent


- Neha Narkhede


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Jay Kreps <bo...@gmail.com>.

> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/clients/producer/ProducerConfig.java, line 19
> > <https://reviews.apache.org/r/17263/diff/1/?file=436449#file436449line19>
> >
> >     I think someone on the mailing list suggested this as well and I agree. Instead of calling this metadata.broker.list, can we call it bootstrap.broker.list or something that can suggest this is the list of brokers used to bootstrap the producer?

Agreed. I used that name for compatibility with the existing configs, but given that the API is different I don't think config compatibility is such a concern and that name is better.


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 167
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line167>
> >
> >     We make only one pass over the partitions and drain at most one batch at a time. However, there could be more batches ready for a partition while other partitions have very little data ready. So at the end of one pass, maxSize worth data is still not accumulated. Is it worth making n passes until maxSize data is accumulated? Essentially can we send more than one RecordBatch from a partition in one produce request? The use case I'm thinking about is where one partition is very high throughput compared to the rest.
> >     
> >     However, I think the above will require us to change quite a lot of the APIs that currently assume only one RecordBatch of a partition is in flight

I think there are suggestions there 
1. Take more than a single batch if more than a single batch is ready when checking the partitions
2. Do multiple iterations over the set of partitions.

(2) probably wouldn't be worth it as the loop is pretty tight (< 1 microseconds) so although it is possible we could complete another batch in that time it's unlikely (I think, anyway, would have to try it to know).

(1) does actually make sense. Essentially instead of taking one batch from each take all complete batches from each. The goal of this would be to give larger clumps for fewer partitions which might be friendlier for the server and generate fewer requests. I thought about this a bit but ended up going with what's there. There are a set of possible optimizations I have in mind but I think until we have it all up and working with scripted perf tests over a range of cases to get some data it is hard to say what will really pay off.


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java, line 30
> > <https://reviews.apache.org/r/17263/diff/1/?file=436456#file436456line30>
> >
> >     Would it make more sense to initialize the list of thunks to queue size instead of 5 ?

Hmm, which queue?


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 156
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line156>
> >
> >     Would it be cleaner to move this logic inside NodeState and expose it with some API like isConnectable()?

Yes that would be reasonable.


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 207
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line207>
> >
> >     socket buffer sizes should be configurable, no? This is especially useful while tuning mirror makers for cross-colo data mirroring

Yes, had a todo there for that.


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > clients/src/main/java/kafka/common/network/Selector.java, line 113
> > <https://reviews.apache.org/r/17263/diff/1/?file=436514#file436514line113>
> >
> >     shouldn't we remove the key from the map once the key is cancelled to prevent the map from growing to an unbounded size?

I think so, good point.


> On Jan. 28, 2014, 2:07 a.m., Neha Narkhede wrote:
> > project/Build.scala, line 141
> > <https://reviews.apache.org/r/17263/diff/1/?file=436562#file436562line141>
> >
> >     clients needs to be added to the aggregate list so that it will build as part of ./sbt package. Currently, sbt doesn't build clients because of this.

Yeah I was holding off on that since this jar is alpha and probably shouldn't be part of the release. It is still buildable in the interm by just doing "package kafka-clients".


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32850
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32850
-----------------------------------------------------------


Made a first pass over some parts of the code. Will follow up with more review comments tomorrow.


clients/src/main/java/kafka/clients/producer/Partitioner.java
<https://reviews.apache.org/r/17263/#comment61847>

    



clients/src/main/java/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/17263/#comment61848>

    I think someone on the mailing list suggested this as well and I agree. Instead of calling this metadata.broker.list, can we call it bootstrap.broker.list or something that can suggest this is the list of brokers used to bootstrap the producer?



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment61959>

    We make only one pass over the partitions and drain at most one batch at a time. However, there could be more batches ready for a partition while other partitions have very little data ready. So at the end of one pass, maxSize worth data is still not accumulated. Is it worth making n passes until maxSize data is accumulated? Essentially can we send more than one RecordBatch from a partition in one produce request? The use case I'm thinking about is where one partition is very high throughput compared to the rest.
    
    However, I think the above will require us to change quite a lot of the APIs that currently assume only one RecordBatch of a partition is in flight



clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/17263/#comment61866>

    Would it make more sense to initialize the list of thunks to queue size instead of 5 ?



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment61897>

    Would it be cleaner to move this logic inside NodeState and expose it with some API like isConnectable()?



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment61898>

    socket buffer sizes should be configurable, no? This is especially useful while tuning mirror makers for cross-colo data mirroring



clients/src/main/java/kafka/common/network/Selector.java
<https://reviews.apache.org/r/17263/#comment61905>

    shouldn't we remove the key from the map once the key is cancelled to prevent the map from growing to an unbounded size?



project/Build.scala
<https://reviews.apache.org/r/17263/#comment61888>

    clients needs to be added to the aggregate list so that it will build as part of ./sbt package. Currently, sbt doesn't build clients because of this.


- Neha Narkhede


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32802
-----------------------------------------------------------



clients/src/main/java/kafka/common/PartitionInfo.java
<https://reviews.apache.org/r/17263/#comment61801>

    Why not use TopicPartition class here?



clients/src/main/java/kafka/common/config/ConfigDef.java
<https://reviews.apache.org/r/17263/#comment61802>

    Verifiable properties also specifies if a default value was overriden. This is useful when debugging. Could we have that here?



clients/src/main/java/kafka/common/metrics/CompoundStat.java
<https://reviews.apache.org/r/17263/#comment61803>

    Some documentation on the metrics stuff would be useful



clients/src/main/java/kafka/common/metrics/Metrics.java
<https://reviews.apache.org/r/17263/#comment61804>

    Should this be unmodifiable?



clients/src/main/java/kafka/common/network/NetworkSend.java
<https://reviews.apache.org/r/17263/#comment61806>

    It would be worth adding a comment on why this extends ByteBufferSend. All network sends are materialized into buffer first before being sent. Have you thought of avoiding that on send?



clients/src/main/java/kafka/common/network/NetworkSend.java
<https://reviews.apache.org/r/17263/#comment61805>

    Why do copy here? Instead why not maintain the buffers as a list and add size as the first buffer to it?



clients/src/main/java/kafka/common/network/Selector.java
<https://reviews.apache.org/r/17263/#comment61807>

    Typo. Not an interface.



clients/src/main/java/kafka/common/network/Selector.java
<https://reviews.apache.org/r/17263/#comment61808>

    Do you really want to return the actual modifiable lists here?



clients/src/main/java/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/17263/#comment61809>

    Do you want error code in the response header?


- Sriram Subramanian


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Jay Kreps <bo...@gmail.com>.

> On March 20, 2014, 8:20 p.m., Guozhang Wang wrote:
> > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java, lines 29-40
> > <https://reviews.apache.org/r/17263/diff/1/?file=436505#file436505line29>
> >
> >     This is not clear to me how the logic works for resetting old samples, could you add some comments?
> >     
> >     Since this.current will never be bigger than config.samples(), it seems this.current >= samples.size() will never be true?

It is true on initialization as there are no samples. Essentially this code expands the sample set until it reaches the desired size, then it just recycles old samples.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review37833
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review37833
-----------------------------------------------------------


One general comment: can we refactor the structure of metrics/sensor/metric/stat a little bit, so that it is not so mixed? Currently each metrics/sensor/reporter keeps a list of metrics (and it seems the metrics list in sensors are not used anywhere?), is that true that the relationships between them are all many-to-many? From the code I summarized the structure as:

1. Each Report of the Metrics contain all the Metric(s).
2. Each Sensor will contain part of the Metric(s) in a Metrics, their Metric(s) set seems non-overlapping.



clients/src/main/java/kafka/common/metrics/MetricConfig.java
<https://reviews.apache.org/r/17263/#comment69583>

    Seems unit is not used anyway?



clients/src/main/java/kafka/common/metrics/Metrics.java
<https://reviews.apache.org/r/17263/#comment69597>

    Also initialize arraylist size to 0 for
    
    public Metrics(Time time)
    
    Otherwise initialize will be 10.



clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
<https://reviews.apache.org/r/17263/#comment69761>

    This is not clear to me how the logic works for resetting old samples, could you add some comments?
    
    Since this.current will never be bigger than config.samples(), it seems this.current >= samples.size() will never be true?



clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
<https://reviews.apache.org/r/17263/#comment69763>

    Ditto here.


- Guozhang Wang


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.

> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 80
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line80>
> >
> >     What happens if freeUp actually got memory from the free list? We still seem to create a new buffer with the given size. Why are we not returning from the pool? It looks like availableMemory can go negative here.
> 
> Jay Kreps wrote:
>     Let me know if this is just confusion about how the pool works. The idea is that I want to optimize the common case of allocations that match the batch size, but I still want to allow large allocations of any size. I also want to avoid implementing malloc, as you rightly pointed out. The compromise I came up with was to special case the batch size and pool that size since that should be 99% of the allocations, when messages come in that are bigger then this i just directly allocate memory from them and don't attempt to pool it.
>     
>     But this means you can have a bunch of buffers in the free list (which are always exactly batchSize) that use up all available memory. So if you get a custom allocation size you need to toss out some pooled buffers to ensure you don't exceed your memory limit. Let me know if that makes sense and isn't totally insane.

I would be surprised if one sets the batch size and sends messages larger than the batch size. As you pointed out this would be rare. Does it make sense then to keep the max memory a soft limit and just allocate the large message and reduce complexity here. I am not recommending one way or the other though.


- Sriram


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Jay Kreps <bo...@gmail.com>.

> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/Callback.java, line 14
> > <https://reviews.apache.org/r/17263/diff/1/?file=436443#file436443line14>
> >
> >     If the caller implements onCompletion to block for a while, the io thread would be blocked. Should this be mentioned somewhere in the interface?

I actually did mention that in the javadoc on that interface as well as in the producer javadoc. I considered the possibility of some kind of special thread pool for these but I think in most cases that would be overkill.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150>
> >
> >     I think what you meant here when you said "the callback will execute in the I/O thread of the producer and so should be reasonably fast." is that the callback should be implemented such that it returns immediately. This is not very obvious from the current description.

Hmm, you don't think so? I will specifically mention blocking.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 216
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line216>
> >
> >     I am not a big fan of this api. I started on these lines for another project and it soon made me reimplement most of codahale features which really is not worth it. I understand the reasoning behind this (not have dependency on external libraries and have any reporters written) but it becomes a pain to maintain, reimplement and stabilize. Also, clients would now have to understand how to use this data to implement metrics. Codahale already supports quite a few reporters. An alternative to the current approach is to take a list of reporters from the config and instantiate them in code using codahale reporters. The project is very active and have done quite a few optimizations recently which are useful. We could end up being handicapped because we may not have the right metrics and only way to get it is to implement something on our own.

Yeah I think the metrics stuff is open to the accusation of reinventing the wheel. I will start a thread on this.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34
> > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34>
> >
> >     This seems a bit confusing. It would be useful to keep the usage of RecordSend in the callback and sync producer to be the same. Is there any reason that await throws on error when you have apis to check error?

I think I am confused. The usage in callback and directly is the same, no?

The question of whether to have await() throw an error or not is good. I think that is how futures in java work. It also forces the error versus making it possible to use the offset when there is an error. Basically I think I wanted to avoid the mongodb getLastError() where errors are silent unless you explicitly look for them.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 29
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line29>
> >
> >     A little more information on how availableMemory and free list are used would be useful to comment about.

Agreed.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 79
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79>
> >
> >     Is this exception safe?

Hmm, I think what you are saying is that interrupting the thread may cause it to leak memory?


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?

Yeah. This isn't terribly self-explanatory. The idea here is that you want to encourage medium sized batches to avoid small sends. So maybe you set a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and hope more messages show up. If you get a large message you just allocate a buffer for it and send it right away. This let's us have no artificial bound on message size, but handle small messages efficiently.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 80
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line80>
> >
> >     What happens if freeUp actually got memory from the free list? We still seem to create a new buffer with the given size. Why are we not returning from the pool? It looks like availableMemory can go negative here.

Let me know if this is just confusion about how the pool works. The idea is that I want to optimize the common case of allocations that match the batch size, but I still want to allow large allocations of any size. I also want to avoid implementing malloc, as you rightly pointed out. The compromise I came up with was to special case the batch size and pool that size since that should be 99% of the allocations, when messages come in that are bigger then this i just directly allocate memory from them and don't attempt to pool it.

But this means you can have a bunch of buffers in the free list (which are always exactly batchSize) that use up all available memory. So if you get a custom allocation size you need to toss out some pooled buffers to ensure you don't exceed your memory limit. Let me know if that makes sense and isn't totally insane.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 108
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line108>
> >
> >     ready to be sent

fixed


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 145
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line145>
> >
> >     I am assuming this method is called only by a single thread and hence there is no requirement to synchronize here

Yes, the ready()/drain() interface only works if you have a single I/O thread, which I do. Basically I don't really want to have a pool of I/O threads unless we have to.

The weird use of drainIndex in that method is because I want to kind of randomize which partitions get chosen if they all have tons of data. I don't want it to be the case that I get the whole max request size worth of data from a single partition every time starving the other partitions.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.

> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150>
> >
> >     I think what you meant here when you said "the callback will execute in the I/O thread of the producer and so should be reasonably fast." is that the callback should be implemented such that it returns immediately. This is not very obvious from the current description.
> 
> Jay Kreps wrote:
>     Hmm, you don't think so? I will specifically mention blocking.

ok. You may want to update the callback interface with the same description.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34
> > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34>
> >
> >     This seems a bit confusing. It would be useful to keep the usage of RecordSend in the callback and sync producer to be the same. Is there any reason that await throws on error when you have apis to check error?
> 
> Jay Kreps wrote:
>     I think I am confused. The usage in callback and directly is the same, no?
>     
>     The question of whether to have await() throw an error or not is good. I think that is how futures in java work. It also forces the error versus making it possible to use the offset when there is an error. Basically I think I wanted to avoid the mongodb getLastError() where errors are silent unless you explicitly look for them.

I was referring to the following usage - 

1. Error need not have to be checked here
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
RecordSend send = producer.send(myRecord, null).await();
print send.offset()
     
2. Error has to be checked
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
producer.send(myRecord, new Callback() { 
                               public void onCompletion(RecordSend send) {
                                  try {
                                    if (send.hasError())
                                       print send.getError()
                                    else
                                       print send.offset();	
                                  } catch(KafkaException e) {
                                    e.printStackTrace();	
                               }
	
                             }
                         });

I was noting this inconsistency.
	


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 79
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79>
> >
> >     Is this exception safe?
> 
> Jay Kreps wrote:
>     Hmm, I think what you are saying is that interrupting the thread may cause it to leak memory?

yes


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?
> 
> Jay Kreps wrote:
>     Yeah. This isn't terribly self-explanatory. The idea here is that you want to encourage medium sized batches to avoid small sends. So maybe you set a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and hope more messages show up. If you get a large message you just allocate a buffer for it and send it right away. This let's us have no artificial bound on message size, but handle small messages efficiently.

Makes sense


- Sriram


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I will definitely do this. One todo is to work out the final config
names and document them all.

-Jay


On Wed, Jan 29, 2014 at 11:29 AM, Neha Narkhede <ne...@gmail.com>wrote:

>
>
> > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > >
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> line 87
> > > <
> https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> > >
> > >     We can allocate a batch larger than the batchSize?
> >
> > Jay Kreps wrote:
> >     Yeah. This isn't terribly self-explanatory. The idea here is that
> you want to encourage medium sized batches to avoid small sends. So maybe
> you set a batch size of 4k. If you get a 1k message, you allocate a 4k
> buffer and append it and hope more messages show up. If you get a large
> message you just allocate a buffer for it and send it right away. This
> let's us have no artificial bound on message size, but handle small
> messages efficiently.
> >
> > Sriram Subramanian wrote:
> >     Makes sense
>
> Can we put the above explanation as a comment in the code? Will be easier
> to understand down the road.
>
>
> - Neha
>
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/#review32800
> -----------------------------------------------------------
>
>
> On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> >
> > -----------------------------------------------------------
> > This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/17263/
> > -----------------------------------------------------------
> >
> > (Updated Jan. 23, 2014, 8:54 p.m.)
> >
> >
> > Review request for kafka.
> >
> >
> > Bugs: KAFKA-1227
> >     https://issues.apache.org/jira/browse/KAFKA-1227
> >
> >
> > Repository: kafka
> >
> >
> > Description
> > -------
> >
> > KAFKA-1227 New producer!
> >
> >
> > Diffs
> > -----
> >
> >   clients/build.sbt PRE-CREATION
> >
> clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/KafkaProducer.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/MockProducer.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/Partitioner.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/ProducerConfig.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/ProducerRecord.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/RecordSend.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/internals/Metadata.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/producer/internals/Sender.java
> PRE-CREATION
> >   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION
> >   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Metric.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Node.java PRE-CREATION
> >   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION
> >   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION
> >   clients/src/main/java/kafka/common/StringSerialization.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION
> >   clients/src/main/java/kafka/common/config/AbstractConfig.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION
> >   clients/src/main/java/kafka/common/config/ConfigException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/ApiException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/CorruptMessageException.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/NetworkException.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/RetryableException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/TimeoutException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/errors/UnknownServerException.java
> PRE-CREATION
> >
> clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/CompoundStat.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/JmxReporter.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/KafkaMetric.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/MeasurableStat.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/MetricConfig.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/MetricsReporter.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION
> >
> clients/src/main/java/kafka/common/metrics/QuotaViolationException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Count.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Histogram.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Percentile.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/Total.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/network/ByteBufferReceive.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/network/ByteBufferSend.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/network/NetworkReceive.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/network/NetworkSend.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION
> >   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION
> >   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION
> >   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/ProtoUtils.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/Field.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/Schema.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/SchemaException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/Struct.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/protocol/types/Type.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/record/CompressionType.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/record/InvalidRecordException.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION
> >   clients/src/main/java/kafka/common/record/MemoryRecords.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION
> >   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION
> >   clients/src/main/java/kafka/common/requests/RequestHeader.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/requests/RequestSend.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/requests/ResponseHeader.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/AbstractIterator.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
> PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION
> >   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION
> >   clients/src/test/java/kafka/clients/common/network/SelectorTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/clients/producer/MetadataTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/clients/producer/MockProducerTest.java
> PRE-CREATION
> >
> clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/clients/producer/RecordSendTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/clients/producer/SenderTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/config/ConfigDefTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/metrics/MetricsTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
> PRE-CREATION
> >
> clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION
> >   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
> PRE-CREATION
> >   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION
> >   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION
> >   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION
> >   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION
> >   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION
> >   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e
> >
> > Diff: https://reviews.apache.org/r/17263/diff/
> >
> >
> > Testing
> > -------
> >
> >
> > Thanks,
> >
> > Jay Kreps
> >
> >
>
>

Re: Review Request 17263: New producer for Kafka.

Posted by Neha Narkhede <ne...@gmail.com>.

> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?
> 
> Jay Kreps wrote:
>     Yeah. This isn't terribly self-explanatory. The idea here is that you want to encourage medium sized batches to avoid small sends. So maybe you set a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and hope more messages show up. If you get a large message you just allocate a buffer for it and send it right away. This let's us have no artificial bound on message size, but handle small messages efficiently.
> 
> Sriram Subramanian wrote:
>     Makes sense

Can we put the above explanation as a comment in the code? Will be easier to understand down the road.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------



clients/src/main/java/kafka/clients/producer/Callback.java
<https://reviews.apache.org/r/17263/#comment61778>

    If the caller implements onCompletion to block for a while, the io thread would be blocked. Should this be mentioned somewhere in the interface?



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment61779>

    "you can easily achieve"



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment61780>

    I think what you meant here when you said "the callback will execute in the I/O thread of the producer and so should be reasonably fast." is that the callback should be implemented such that it returns immediately. This is not very obvious from the current description.



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment61781>

    I am not a big fan of this api. I started on these lines for another project and it soon made me reimplement most of codahale features which really is not worth it. I understand the reasoning behind this (not have dependency on external libraries and have any reporters written) but it becomes a pain to maintain, reimplement and stabilize. Also, clients would now have to understand how to use this data to implement metrics. Codahale already supports quite a few reporters. An alternative to the current approach is to take a list of reporters from the config and instantiate them in code using codahale reporters. The project is very active and have done quite a few optimizations recently which are useful. We could end up being handicapped because we may not have the right metrics and only way to get it is to implement something on our own. 



clients/src/main/java/kafka/clients/producer/Partitioner.java
<https://reviews.apache.org/r/17263/#comment61782>

    make use of the key



clients/src/main/java/kafka/clients/producer/RecordSend.java
<https://reviews.apache.org/r/17263/#comment61783>

    This seems a bit confusing. It would be useful to keep the usage of RecordSend in the callback and sync producer to be the same. Is there any reason that await throws on error when you have apis to check error? 



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61786>

    A little more information on how availableMemory and free list are used would be useful to comment about.



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61784>

    Is this exception safe?



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment61785>

    What happens if freeUp actually got memory from the free list? We still seem to create a new buffer with the given size. Why are we not returning from the pool? It looks like availableMemory can go negative here.



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment61787>

    We can allocate a batch larger than the batchSize?



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment61788>

    ready to be sent



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment61789>

    I am assuming this method is called only by a single thread and hence there is no requirement to synchronize here



clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/17263/#comment61790>

    magic number


- Sriram Subramanian


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 17263: New producer for Kafka.

Posted by Sriram Subramanian <sr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32808
-----------------------------------------------------------



clients/src/main/java/kafka/common/protocol/types/Struct.java
<https://reviews.apache.org/r/17263/#comment61817>

    The name seems too generic and does not seem to indicate the intent.



clients/src/main/java/kafka/common/record/Record.java
<https://reviews.apache.org/r/17263/#comment61844>

    It might be useful to have a actual message format displayed in the comment.
    
    /**
     *  - - - - - - - - - - - - -
     * |          |          |
     * | version  |   Magic  | . . .
     * |(2 bytes) | (1 byte) |
     *  - - - - - - - - - - - - -
     **/



clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
<https://reviews.apache.org/r/17263/#comment61865>

    full copy


- Sriram Subramanian


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>