You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sa...@apache.org on 2022/03/17 21:31:25 UTC
[samza] branch master updated: SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)
This is an automated email from the ASF dual-hosted git repository.
saniljain15 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f41d29c SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)
f41d29c is described below
commit f41d29cc10556a150d5b380a15a126eafc290b97
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Mar 17 14:31:19 2022 -0700
SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks (#1589)
* SAMZA-2728: [Elasticity] improve distribution of messages across elastic tasks
* add test for end-of-stream and elaticity
---
.../samza/system/IncomingMessageEnvelope.java | 5 +++-
.../java/org/apache/samza/config/JobConfig.java | 4 +--
.../java/org/apache/samza/container/RunLoop.java | 15 ++++++++--
.../org/apache/samza/config/TestJobConfig.java | 10 +++++++
.../org/apache/samza/container/TestRunLoop.java | 33 ++++++++++++++++++++++
5 files changed, 62 insertions(+), 5 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index f2d07de..2112463 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -132,7 +132,10 @@ public class IncomingMessageEnvelope {
if (envelopeKeyorOffset == null) {
return new SystemStreamPartition(systemStreamPartition, 0);
}
- int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
+
+ // modulo 31 first to best spread out the hashcode and then modulo elasticityFactor for actual keyBucket
+ // Note: elasticityFactor <= 16 so modulo 31 is safe to do.
+ int keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
return new SystemStreamPartition(systemStreamPartition, keyBucket);
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 59163b9..6384747 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -485,8 +485,8 @@ public class JobConfig extends MapConfig {
public int getElasticityFactor() {
int elasticityFactor = getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
- if (elasticityFactor < 1) {
- throw new ConfigException("Elasticity factor can not be less than 1");
+ if (elasticityFactor < 1 || elasticityFactor > 16) {
+ throw new ConfigException("Elasticity factor can not be less than 1 or greater than 16");
}
return elasticityFactor;
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index a830251..b7a80a3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -784,8 +785,18 @@ public class RunLoop implements Runnable, Throttleable {
IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isEndOfStream()) {
- SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
- processingSspSet.remove(ssp);
+ if (elasticityFactor <= 1) {
+ SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+ processingSspSet.remove(ssp);
+ } else {
+ // if envelope is end of stream, the ssp of envelope should be removed from task's processing set irresp of keyBucket
+ SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
+ Optional<SystemStreamPartition> ssp = processingSspSet.stream()
+ .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
+ && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
+ .findFirst();
+ ssp.ifPresent(processingSspSet::remove);
+ }
if (!hasIntermediateStreams) {
pendingEnvelopeQueue.remove();
}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 651d48d..4d17166 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -669,6 +669,16 @@ public class TestJobConfig {
}
assertTrue(exceptionCaught);
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(17))));
+ exceptionCaught = false;
+ try {
+ jobConfig.getElasticityFactor();
+ } catch (ConfigException e) {
+ exceptionCaught = true;
+ }
+ assertTrue(exceptionCaught);
+
jobConfig = new JobConfig(new MapConfig());
assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 9452ddf..7bd5f9a 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -212,6 +212,39 @@ public class TestRunLoop {
}
@Test
+ public void testEndOfStreamElasticityEnabled() {
+
+ TaskName taskName0 = new TaskName(p0.toString() + " 0");
+ TaskName taskName1 = new TaskName(p0.toString() + " 1");
+ SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStream", p0);
+ SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStream", p0, 0);
+ SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStream", p0, 1);
+
+ // create EOS IME such that its ssp keybucket maps to ssp0 and not to ssp1
+ // task in the runloop should give this ime to both it tasks
+ IncomingMessageEnvelope envelopeEOS = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
+ when(envelopeEOS.getSystemStreamPartition(2)).thenReturn(ssp0);
+
+
+ // two task in the run loop that processes ssp0 -> 0th keybucket of ssp and ssp1 -> 1st keybucket of ssp
+ // EOS ime should be given to both the tasks irresp of the keybucket
+ RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
+ RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
+
+ SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+ when(consumerMultiplexer.choose(false)).thenReturn(envelopeEOS).thenReturn(null);
+
+ Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0, taskName1, task1);
+ int maxMessagesInFlight = 1;
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2);
+ runLoop.run();
+
+ verify(task0).endOfStream(any());
+ verify(task1).endOfStream(any());
+ }
+
+ @Test
public void testWindow() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);