You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/11/01 16:57:15 UTC

[GitHub] [samza] ajothomas opened a new pull request, #1639: SAMZA-2741: Pipeline Drain- Fix processing of Drain messages for High-Level and Low-level API

ajothomas opened a new pull request, #1639:
URL: https://github.com/apache/samza/pull/1639

   # Symptoms and Cause:
   There were a few issues with the processing of drain messages that this PR attempts to fix. The following issues were encountered during the end-to-end testing with test pipelines.
   
   1. It was observed that the `RunLoop` was running into an exception when the tasks were trying to consume from the intermediate stream.
   This was happening as we stop all consumers when we encounter drain. This also stops the consumers which ingest data from the intermediate streams. 
   
   After 1 was resolved, some additional issues were observed-
   2. The pipeline was getting stuck after partial consumption of data after the last shuffle stage. 
   
   This was happening as we were removing the SSPs from the processing set of SSPs for a task for high-level API processing. This was leading to unprocessed drain messages and thereby the pipeline would remain stuck.
   
   3. The pipeline was stuck if a drain message was encountered prior to the start of the pipeline. This is happening as we set the `SystemConsumers` to drain mode before it is even started by `SamzaContainer`
   
   # Changes:
   1. Changed the `SystemConsumers` code to remove the line which stopped registered `SystemConsumer`s on drain. 
   
   2. Restrict the logic to remove the SSP from the processing SSP set only to high-level API. Additionally, ask the `TaskCoordinator` to commit the task once all streams for a task have drained.
   
   3. To fix the issue of pipeline getting stuck if a drain control message was present prior to container start, we write drain and watermark control messages on `SystemConsumers` start if it is in drain mode.
   
   Additionally, Drain related integration tests (`DrainLowLevelApiIntegrationTest` and `DrainHighLevelApiIntegrationTest`) have been enabled as most of the flakiness and edge-cases have been fixed now. There is a small chance that both the tests can be flaky on rare occasions as `TestRunner.run` and in-memory metadata store writes are happening in separate threads. Despite adding a generous 5 seconds delay for metadata store writes, it is possible for the order to get mixed up which causes the test to fail. Added a test rule `RetryRule` to attempt a retry if the tests fail the first time.
   
   # Tests:
   - End to end tests for both high-level and low-level API
   - Unit tests for Drain in RunLoop
   - Integ test for Drain for High-level and Low-Level API- DrainLowLevelApiIntegrationTest & DrainHighLevelApiIntegrationTest
   
   # API changes:
   None
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] xinyuiscool commented on a diff in pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1010901435


##########
samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java:
##########
@@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor appDescriptor) {
           .map(KV::getValue)
           .partitionBy(PageView::getMemberId, pv -> pv,
               KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p1")
+          .map(kv -> KV.of(kv.getKey() * 31, kv.getValue()))
+          .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2")
           .sink((m, collector, coordinator) -> {
             RECEIVED.add(m.getValue());
           });
     }
   }
 
-  // The test can be occasionally flaky, so we set Ignore annotation
-  // Remove ignore annotation and run the test as follows:
-  // ./gradlew :samza-test:test --tests org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+  /**
+   * This test will test drain and consumption of some messages from the in-memory topic.
+   * In order to simulate the real-world behaviour of drain, the test adds messages to the in-memory topic buffer periodically
+   * in a delayed fashion instead of all at once. The test then writes the drain notification message to the in-memory
+   * metadata store to drain and stop the pipeline. This write is done shortly after the pipeline starts and before all
+   * the messages are written to the topic's buffer. As a result, the total count of the processed messages will be less
+   * than the expected count of messages.
+   * */
   @Ignore
   @Test
-  public void testPipeline() {
+  public void testDrain() {

Review Comment:
   Seems most of the integration tests are ignored due to relying on wall-clock time. Is it possible to enable a couple and we don't need to do this timed wait? Not sure this TestRunner support waitForFinish(). but samza runners have this api so in theory we should be able to use it.



##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't match with the current deployment
+            final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove();
+            consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream()
-                  .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {
-              // Don't remove from the pending queue as we want the DAG to pick up Drain message and propagate it to
-              // intermediate streams
+              // The flow below only applies to samza low-level API
+
+              // For high-level API, we do not remove the message from pending queue.
+              // It will be picked by the process flow instead of drain flow, as we want the drain control message
+              // to be processed by the High-level API Operator DAG.
+
+              processingSspSetToDrain.remove(envelope.getSystemStreamPartition());
               pendingEnvelopeQueue.remove();
+              return processingSspSetToDrain.isEmpty();

Review Comment:
   seems we will return this anyway in the end. Do we need to have another return here?



##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't match with the current deployment
+            final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove();
+            consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream()
-                  .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {

Review Comment:
   It's not very safe to use intermediatestreams to decide whether high-level or low-level. It's possible that a high-level job has no intermediate streams but has states that needs to be drained. 
   
   Do we even support drain on low-level api? I think we don't have such support since watermark doesn't mean anything. If we only use drain on high-level apis, I think we can safely delete this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123456


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't match with the current deployment
+            final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove();
+            consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream()
-                  .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {
-              // Don't remove from the pending queue as we want the DAG to pick up Drain message and propagate it to
-              // intermediate streams
+              // The flow below only applies to samza low-level API
+
+              // For high-level API, we do not remove the message from pending queue.
+              // It will be picked by the process flow instead of drain flow, as we want the drain control message
+              // to be processed by the High-level API Operator DAG.
+
+              processingSspSetToDrain.remove(envelope.getSystemStreamPartition());
               pendingEnvelopeQueue.remove();
+              return processingSspSetToDrain.isEmpty();

Review Comment:
   Changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123301


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't match with the current deployment
+            final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove();
+            consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream()
-                  .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {

Review Comment:
   Changed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123821


##########
samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java:
##########
@@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor appDescriptor) {
           .map(KV::getValue)
           .partitionBy(PageView::getMemberId, pv -> pv,
               KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p1")
+          .map(kv -> KV.of(kv.getKey() * 31, kv.getValue()))
+          .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2")
           .sink((m, collector, coordinator) -> {
             RECEIVED.add(m.getValue());
           });
     }
   }
 
-  // The test can be occasionally flaky, so we set Ignore annotation
-  // Remove ignore annotation and run the test as follows:
-  // ./gradlew :samza-test:test --tests org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+  /**
+   * This test will test drain and consumption of some messages from the in-memory topic.
+   * In order to simulate the real-world behaviour of drain, the test adds messages to the in-memory topic buffer periodically
+   * in a delayed fashion instead of all at once. The test then writes the drain notification message to the in-memory
+   * metadata store to drain and stop the pipeline. This write is done shortly after the pipeline starts and before all
+   * the messages are written to the topic's buffer. As a result, the total count of the processed messages will be less
+   * than the expected count of messages.
+   * */
   @Ignore
   @Test
-  public void testPipeline() {
+  public void testDrain() {

Review Comment:
   Tests have been turned on now after a few tweaks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] xinyuiscool merged pull request #1639: SAMZA-2741: [Pipeline Drain] Fix processing of Drain messages for High-Level and Low-level API

Posted by GitBox <gi...@apache.org>.
xinyuiscool merged PR #1639:
URL: https://github.com/apache/samza/pull/1639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org