You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/11/03 16:18:09 UTC

[GitHub] [samza] ajothomas commented on a diff in pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123301


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't match with the current deployment
+            final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove();
+            consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream()
-                  .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {

Review Comment:
   Changed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org