You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2022/05/17 22:02:44 UTC

[samza] 01/01: Revert "SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled"

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch revert-1599-elasticity-checkpoint-readWrite
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 8d192767cdc08b9f5f0277bd91f25334b0ac1793
Author: rayman7718 <ra...@gmail.com>
AuthorDate: Tue May 17 15:02:40 2022 -0700

    Revert "SAMZA-2734: [Elasticity] Update last processed offset after an envelope is finished processing when elasticity is enabled"
---
 .../apache/samza/checkpoint/OffsetManager.scala    | 12 +------
 .../org/apache/samza/system/SystemConsumers.scala  |  4 +--
 .../samza/checkpoint/TestOffsetManager.scala       | 41 ----------------------
 3 files changed, 3 insertions(+), 54 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 1bd5561c7..7a12625f0 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -216,19 +216,9 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
-    // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
-    // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
-    // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
-    // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
-    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,
-      throw new SamzaException("No SSPs registered for task: " + taskName))
-      .filter(ssp => ssp.getSystemStream.equals(systemStreamPartition.getSystemStream)
-        && ssp.getPartition.equals(systemStreamPartition.getPartition))
-      .toIterator.next()
-
     lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
     if (offset != null && !offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
-      lastProcessedOffsets.get(taskName).put(sspWithKeyBucket, offset)
+      lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
     }
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 4a24303cb..9367cd73a 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -188,8 +188,6 @@ class SystemConsumers (
       // but the actual systemConsumer which consumes from the input does not know about KeyBucket.
       // hence, use an SSP without KeyBucket
       consumer.register(removeKeyBucket(systemStreamPartition), offset)
-      chooser.register(removeKeyBucket(systemStreamPartition), offset)
-      debug("consumer.register and chooser.register for ssp: %s with offset %s" format (systemStreamPartition, offset))
     }
 
     debug("Starting consumers.")
@@ -246,6 +244,8 @@ class SystemConsumers (
     metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
 
+    chooser.register(systemStreamPartition, offset)
+
     try {
       val consumer = consumers(systemStreamPartition.getSystem)
       val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 3c226c4c6..3949ecf18 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -579,47 +579,6 @@ class TestOffsetManager {
     assertEquals("60", modifiedOffsets.get(ssp6))
   }
 
-  @Test
-  def testElasticityUpdateWithoutKeyBucket: Unit = {
-    // When elasticity is enabled, task consumes a part of the full SSP represented by SSP With KeyBucket.
-    // OffsetManager tracks the set of SSP with KeyBucket consumed by a task.
-    // However, after an IME processing is complete, OffsetManager.update is called without KeyBuket.
-    // OffsetManager has to find and udpate the last processed offset for the task correctly for its SSP with KeyBucket.
-    val taskName = new TaskName("c")
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val systemStreamPartitionWithKeyBucket = new SystemStreamPartition(systemStreamPartition, 0);
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val startpointManagerUtil = getStartpointManagerUtil()
-    val systemAdmins = mock(classOf[SystemAdmins])
-    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
-    // register task and its input SSP with KeyBucket
-    offsetManager.register(taskName, Set(systemStreamPartitionWithKeyBucket))
-
-    offsetManager.start
-
-    // update is called with only the full SSP and no keyBucket information.
-    offsetManager.update(taskName, systemStreamPartition, "46")
-    // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit()
-    val checkpoint46 = offsetManager.getLastProcessedOffsets(taskName)
-    offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint
-    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint46))
-    // OffsetManager correctly updates the lastProcessedOffset and checkpoint for the task and input SSP with KeyBucket.
-    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
-    assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
-
-    // Now write the checkpoint for the latest offset
-    val checkpoint47 = offsetManager.getLastProcessedOffsets(taskName)
-    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint47))
-
-    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
-    assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
-  }
-
   // Utility method to create and write checkpoint in one statement
   def checkpoint(offsetManager: OffsetManager, taskName: TaskName): Unit = {
     offsetManager.writeCheckpoint(taskName, new CheckpointV1(offsetManager.getLastProcessedOffsets(taskName)))