You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/29 18:13:04 UTC

[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r910265594


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -160,91 +220,162 @@ public double measure(MetricConfig config, long now) {
         metrics.addMetric(metricName, availableBytes);
     }
 
+    private void setPartition(AppendCallbacks callbacks, int partition) {
+        if (callbacks != null)
+            callbacks.setPartition(partition);
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
      * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
      * <p>
      *
-     * @param tp The topic/partition to which this record is being sent
+     * @param topic The topic to which this record is being sent
+     * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
+     *                  if any partition could be used
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param headers the Headers for the record
-     * @param callback The user-supplied callback to execute when the request is complete
+     * @param callbacks The callbacks to execute
      * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
      *                        running the partitioner's onNewBatch method before trying to append again
      * @param nowMs The current time, in milliseconds
+     * @param cluster The cluster metadata
      */
-    public RecordAppendResult append(TopicPartition tp,
+    public RecordAppendResult append(String topic,
+                                     int partition,
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
                                      Header[] headers,
-                                     Callback callback,
+                                     AppendCallbacks callbacks,
                                      long maxTimeToBlock,
                                      boolean abortOnNewBatch,
-                                     long nowMs) throws InterruptedException {
+                                     long nowMs,
+                                     Cluster cluster) throws InterruptedException {
+        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));
+
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
         if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
-            // check if we have an in-progress batch
-            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
-            synchronized (dq) {
-                if (closed)
-                    throw new KafkaException("Producer closed while send in progress");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
-                if (appendResult != null)
-                    return appendResult;
-            }
+            // Loop to retry in case we encounter partitioner's race conditions.
+            while (true) {
+                // If the message doesn't have any partition affinity, so we pick a partition based on the broker
+                // availability and performance.  Note, that here we peek current partition before we hold the
+                // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
+                // deque lock.
+                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
+                final int effectivePartition;
+                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
+                    partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
+                    effectivePartition = partitionInfo.partition();
+                } else {
+                    partitionInfo = null;
+                    effectivePartition = partition;
+                }
 
-            // we don't have an in-progress record batch try to allocate a new batch
-            if (abortOnNewBatch) {
-                // Return a result that will cause another call to append.
-                return new RecordAppendResult(null, false, false, true);
-            }
+                // Now that we know the effective partition, let the caller know.
+                setPartition(callbacks, effectivePartition);
+
+                // check if we have an in-progress batch
+                Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition hasn't changed and retry.
+                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+                                partitionInfo.partition(), topic);
+                        continue;
+                    }
+                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
+                    if (appendResult != null) {
+                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+                        return appendResult;
+                    }
+                }
 
-            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
-            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
-            buffer = free.allocate(size, maxTimeToBlock);
+                // we don't have an in-progress record batch try to allocate a new batch
+                if (abortOnNewBatch) {
+                    // Return a result that will cause another call to append.
+                    return new RecordAppendResult(null, false, false, true, 0);
+                }
 
-            // Update the current time in case the buffer allocation blocked above.
-            nowMs = time.milliseconds();
-            synchronized (dq) {
-                // Need to check if producer is closed again after grabbing the dequeue lock.
-                if (closed)
-                    throw new KafkaException("Producer closed while send in progress");
+                if (buffer == null) {
+                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
+                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
+                    log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
+                    buffer = free.allocate(size, maxTimeToBlock);
+                }
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
-                if (appendResult != null) {
-                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+                synchronized (dq) {
+                    // After taking the lock, validate that the partition hasn't changed and retry.
+                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+                                partitionInfo.partition(), topic);
+                        continue;
+                    }
+                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
+                    // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
+                    if (appendResult.newBatchCreated)
+                        buffer = null;
+                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
                     return appendResult;
                 }
-
-                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
-                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
-                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
-                        callback, nowMs));
-
-                dq.addLast(batch);
-                incomplete.add(batch);
-
-                // Don't deallocate this buffer in the finally block as it's being used in the record batch
-                buffer = null;
-                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
             }
         } finally {
-            if (buffer != null)
-                free.deallocate(buffer);
+            free.deallocate(buffer);
             appendsInProgress.decrementAndGet();
         }
     }
 
+    /**
+     * Append a new batch to the queue
+     *
+     * @param topic The topic
+     * @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION)
+     * @param dq The queue
+     * @param timestamp The timestamp of the record
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param headers the Headers for the record
+     * @param callbacks The callbacks to execute
+     * @param buffer The buffer for the new batch
+     */
+    private RecordAppendResult appendNewBatch(String topic,
+                                              int partition,
+                                              Deque<ProducerBatch> dq,
+                                              long timestamp,
+                                              byte[] key,
+                                              byte[] value,
+                                              Header[] headers,
+                                              AppendCallbacks callbacks,
+                                              ByteBuffer buffer) {
+        assert partition != RecordMetadata.UNKNOWN_PARTITION;
+
+        // Update the current time in case the buffer allocation blocked above.
+        long nowMs = time.milliseconds();

Review Comment:
   Hmm, this call now happens under the queue lock, which might be the root cause for KAFKA-14020.  Will move it outside, just after allocation of new buffer at line 312.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org