You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sa...@apache.org on 2022/03/17 21:31:25 UTC

[samza] branch master updated: SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)

This is an automated email from the ASF dual-hosted git repository.

saniljain15 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f41d29c  SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)
f41d29c is described below

commit f41d29cc10556a150d5b380a15a126eafc290b97
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Mar 17 14:31:19 2022 -0700

    SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)
    
    * SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks
    
    * add test for end-of-stream and elaticity
---
 .../samza/system/IncomingMessageEnvelope.java      |  5 +++-
 .../java/org/apache/samza/config/JobConfig.java    |  4 +--
 .../java/org/apache/samza/container/RunLoop.java   | 15 ++++++++--
 .../org/apache/samza/config/TestJobConfig.java     | 10 +++++++
 .../org/apache/samza/container/TestRunLoop.java    | 33 ++++++++++++++++++++++
 5 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index f2d07de..2112463 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -132,7 +132,10 @@ public class IncomingMessageEnvelope {
     if (envelopeKeyorOffset == null) {
       return new SystemStreamPartition(systemStreamPartition, 0);
     }
-    int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
+
+    // modulo 31 first to best spread out the hashcode and then modulo elasticityFactor for actual keyBucket
+    // Note: elasticityFactor <= 16 so modulo 31 is safe to do.
+    int keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
     return new SystemStreamPartition(systemStreamPartition, keyBucket);
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 59163b9..6384747 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -485,8 +485,8 @@ public class JobConfig extends MapConfig {
 
   public int getElasticityFactor() {
     int elasticityFactor = getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
-    if (elasticityFactor < 1) {
-      throw new ConfigException("Elasticity factor can not be less than 1");
+    if (elasticityFactor < 1 || elasticityFactor > 16) {
+      throw new ConfigException("Elasticity factor can not be less than 1 or greater than 16");
     }
     return elasticityFactor;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index a830251..b7a80a3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -784,8 +785,18 @@ public class RunLoop implements Runnable, Throttleable {
         IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isEndOfStream()) {
-          SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
-          processingSspSet.remove(ssp);
+          if (elasticityFactor <= 1) {
+            SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+            processingSspSet.remove(ssp);
+          } else {
+            // if envelope is end of stream, the ssp of envelope should be removed from task's processing set irresp of keyBucket
+            SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
+            Optional<SystemStreamPartition> ssp = processingSspSet.stream()
+                .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
+                    && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
+                .findFirst();
+            ssp.ifPresent(processingSspSet::remove);
+          }
           if (!hasIntermediateStreams) {
             pendingEnvelopeQueue.remove();
           }
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 651d48d..4d17166 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -669,6 +669,16 @@ public class TestJobConfig {
     }
     assertTrue(exceptionCaught);
 
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(17))));
+    exceptionCaught = false;
+    try {
+      jobConfig.getElasticityFactor();
+    } catch (ConfigException e) {
+      exceptionCaught = true;
+    }
+    assertTrue(exceptionCaught);
+
     jobConfig = new JobConfig(new MapConfig());
     assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
   }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 9452ddf..7bd5f9a 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -212,6 +212,39 @@ public class TestRunLoop {
   }
 
   @Test
+  public void testEndOfStreamElasticityEnabled() {
+
+    TaskName taskName0 = new TaskName(p0.toString() + " 0");
+    TaskName taskName1 = new TaskName(p0.toString() + " 1");
+    SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStream", p0);
+    SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStream", p0, 0);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStream", p0, 1);
+
+    // create EOS IME such that its ssp keybucket maps to ssp0 and not to ssp1
+    // task in the runloop should give this ime to both it tasks
+    IncomingMessageEnvelope envelopeEOS = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
+    when(envelopeEOS.getSystemStreamPartition(2)).thenReturn(ssp0);
+
+
+    // two task in the run loop that processes ssp0 -> 0th keybucket of ssp and ssp1 -> 1st keybucket of ssp
+    // EOS ime should be given to both the tasks irresp of the keybucket
+    RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
+    RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
+
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelopeEOS).thenReturn(null);
+
+    Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0, taskName1, task1);
+    int maxMessagesInFlight = 1;
+    RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+        callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2);
+    runLoop.run();
+
+    verify(task0).endOfStream(any());
+    verify(task1).endOfStream(any());
+  }
+
+  @Test
   public void testWindow() {
     SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);