You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/02/04 19:55:09 UTC
[samza] branch master updated: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3eaa697 SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)
3eaa697 is described below
commit 3eaa697d791ac444edde930750b7c38f1f5a69eb
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Fri Feb 4 11:55:03 2022 -0800
SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)
Feature: Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
This is achieved by having elastic tasks which is the same as a task for all practical purposes. But an elastic task consumes only a subset of the messages of an SSP.
With an elasticity factor F (integer), the number of elastic tasks will be F times N with N = original task count.
The F elastic tasks per original task all consume subsets of same SSP as the original task. There will be F subsets (aka key bucket) per SSP and a message falls into an SSP bucket 'i' if its message.key.hash()%F == i.
Changes:
introduce the config for enabling elasticity as job.elasticity.factor. If the job without elasticity has N tasks then with factor = F > 1, there will be F times N (elastic) tasks
Add "key bucket" (an integer ranging 0-F) to SSP which will identify the messages within the SSP
Compute the key bucket the IncomingMessageEnvelope falls into given elasticity factor F.
SamzaObjectMapper changes to serde keyBucket component of SSP.
Tests: updated unit tests and added new ones.
API Changes: no public API changes
Upgrade Instructions: N/A
Usage Instructions: set the config job.elasticity.factor > 1 to enable elasticity for the job.
---
.../samza/system/IncomingMessageEnvelope.java | 7 +++
.../apache/samza/system/SystemStreamPartition.java | 33 ++++++++++
.../java/org/apache/samza/config/JobConfig.java | 14 +++++
.../samza/serializers/model/SamzaObjectMapper.java | 7 ++-
.../org/apache/samza/config/TestJobConfig.java | 49 +++++++++++++++
.../serializers/model/TestSamzaObjectMapper.java | 72 ++++++++++++++++++++++
6 files changed, 181 insertions(+), 1 deletion(-)
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 7392253..d13d783 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -111,6 +111,13 @@ public class IncomingMessageEnvelope {
return systemStreamPartition;
}
+ // used for elasticity to determine which elastic task should handle this envelope
+ public SystemStreamPartition getSystemStreamPartition(int elasticityFactor) {
+ Object envelopeKeyorOffset = key != null ? key : offset;
+ int keyBucket = Math.abs(envelopeKeyorOffset.hashCode() % elasticityFactor);
+ return new SystemStreamPartition(systemStreamPartition, keyBucket);
+ }
+
/**
* Offset associated with this message, provided by the system consumer that consumed the message.
*/
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index e9ca9f7..4b32891 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -27,6 +27,7 @@ import org.apache.samza.Partition;
public class SystemStreamPartition extends SystemStream implements Comparable<SystemStreamPartition> {
protected final Partition partition;
protected final int hash; // precomputed as instances are immutable and often stored in hash-addressed data structures
+ protected final int keyBucket; // used for elasticity to determine which elastic task should handle this ssp
/**
* Constructs a Samza stream partition object from specified components.
@@ -38,6 +39,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
super(system, stream);
this.partition = partition;
this.hash = computeHashCode();
+ this.keyBucket = -1;
}
/**
@@ -57,6 +59,24 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
this(other.getSystem(), other.getStream(), partition);
}
+ public SystemStreamPartition(String system, String stream, Partition partition, int keyBucket) {
+ super(system, stream);
+ this.partition = partition;
+ this.hash = computeHashCode();
+ this.keyBucket = keyBucket;
+ }
+
+ public SystemStreamPartition(SystemStreamPartition other, int keyBucket) {
+ super(other.getSystem(), other.getStream());
+ this.partition = other.getPartition();
+ this.hash = computeHashCode();
+ this.keyBucket = keyBucket;
+ }
+
+ public int getKeyBucket() {
+ return keyBucket;
+ }
+
public Partition getPartition() {
return partition;
}
@@ -74,6 +94,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((partition == null) ? 0 : partition.hashCode());
+ result = prime * result + ((keyBucket == -1) ? 0 : keyBucket);
return result;
}
@@ -91,11 +112,17 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
return false;
} else if (!partition.equals(other.partition))
return false;
+ if (keyBucket != other.keyBucket) {
+ return false;
+ }
return true;
}
@Override
public String toString() {
+ if (keyBucket != -1) {
+ return "SystemStreamPartition [" + system + ", " + stream + ", " + partition.getPartitionId() + ", " + keyBucket + "]";
+ }
return "SystemStreamPartition [" + system + ", " + stream + ", " + partition.getPartitionId() + "]";
}
@@ -118,6 +145,12 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
} else if (this.partition.compareTo(that.partition) > 0) {
return 1;
}
+
+ if (this.keyBucket < that.keyBucket) {
+ return -1;
+ } else if (this.keyBucket > that.keyBucket) {
+ return 1;
+ }
return 0;
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b8f249..8a892dc 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -175,6 +175,12 @@ public class JobConfig extends MapConfig {
public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled";
private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true;
+
+ // Enabled elasticity for the job
+ // number of (elastic) tasks in the job will be old task count X elasticity factor
+ public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor";
+ public static final int DEFAULT_JOB_ELASTICITY_FACTOR = 1;
+
public JobConfig(Config config) {
super(config);
}
@@ -466,4 +472,12 @@ public class JobConfig extends MapConfig {
public boolean getContainerHeartbeatMonitorEnabled() {
return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
}
+
+ public boolean getElasticityEnabled() {
+ return getElasticityFactor() > 1;
+ }
+
+ public int getElasticityFactor() {
+ return getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
+ }
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 7888f1f..b2f16d0 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -246,6 +246,7 @@ public class SamzaObjectMapper {
systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition());
+ systemStreamPartitionMap.put("keyBucket", systemStreamPartition.getKeyBucket());
jsonGenerator.writeObject(systemStreamPartitionMap);
}
}
@@ -258,7 +259,11 @@ public class SamzaObjectMapper {
String system = node.get("system").textValue();
String stream = node.get("stream").textValue();
Partition partition = new Partition(node.get("partition").intValue());
- return new SystemStreamPartition(system, stream, partition);
+ int keyBucket = -1;
+ if (node.get("keyBucket") != null) {
+ keyBucket = node.get("keyBucket").intValue();
+ }
+ return new SystemStreamPartition(system, stream, partition, keyBucket);
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 2b9d630..6c6dcb1 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -599,4 +599,53 @@ public class TestJobConfig {
assertFalse(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED,
"false"))).getContainerHeartbeatMonitorEnabled());
}
+
+ @Test
+ public void testGetElastictyEnabled() {
+ // greater than 1 means enabled
+ JobConfig jobConfig = new JobConfig(
+ new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(2))));
+ assertTrue(jobConfig.getElasticityEnabled());
+
+ // one means disabled
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(1))));
+ assertFalse(jobConfig.getElasticityEnabled());
+
+ // zero means disabled
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(0))));
+ assertFalse(jobConfig.getElasticityEnabled());
+
+ // negative means disabled
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(-1))));
+ assertFalse(jobConfig.getElasticityEnabled());
+
+ // not specified uses the default standby count, which means disabled
+ jobConfig = new JobConfig(new MapConfig());
+ assertFalse(jobConfig.getElasticityEnabled());
+ }
+
+ @Test
+ public void testGetElasticityFactor() {
+ JobConfig jobConfig = new JobConfig(
+ new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(2))));
+ assertEquals(2, jobConfig.getElasticityFactor());
+
+ jobConfig = new JobConfig(
+ new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(1))));
+ assertEquals(1, jobConfig.getElasticityFactor());
+
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(0))));
+ assertEquals(0, jobConfig.getElasticityFactor());
+
+ jobConfig =
+ new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(-1))));
+ assertEquals(-1, jobConfig.getElasticityFactor());
+
+ jobConfig = new JobConfig(new MapConfig());
+ assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
+ }
}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 532b674..a02788a 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -187,6 +187,77 @@ public class TestSamzaObjectMapper {
deserializeFromObjectNode(jobModelJson);
}
+ @Test
+ public void testSerializeSystemStreamPartition() throws IOException {
+ // case 1: keyBucket not explicitly mentioned
+ SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+ String serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ ObjectNode sspJson = objectMapper.createObjectNode();
+ sspJson.put("system", "foo");
+ sspJson.put("stream", "bar");
+ sspJson.put("partition", 1);
+
+ // use a plain ObjectMapper to read JSON to make comparison easier
+ ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+ ObjectNode expectedJson = sspJson;
+
+ assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+ assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+ assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+
+ //Case 2: with non-null keyBucket
+ ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+ serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+ sspJson = objectMapper.createObjectNode();
+ sspJson.put("system", "foo");
+ sspJson.put("stream", "bar");
+ sspJson.put("partition", 1);
+ sspJson.put("keyBucket", 1);
+
+ // use a plain ObjectMapper to read JSON to make comparison easier
+ serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+ expectedJson = sspJson;
+
+ assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+ assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+ assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+ assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket"));
+ }
+
+ @Test
+ public void testDeserializeSystemStreamPartition() throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // case 1: keyBucket not explicitly mentioned
+ ObjectNode sspJson = objectMapper.createObjectNode();
+ sspJson.put("system", "foo");
+ sspJson.put("stream", "bar");
+ sspJson.put("partition", 1);
+
+ SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+ String jsonString = new ObjectMapper().writeValueAsString(sspJson);
+ SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+ assertEquals(ssp, deserSSP);
+
+ // case 2: explicitly set key bucket
+ sspJson = objectMapper.createObjectNode();
+ sspJson.put("system", "foo");
+ sspJson.put("stream", "bar");
+ sspJson.put("partition", 1);
+ sspJson.put("keyBucket", 1);
+
+ ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+ jsonString = new ObjectMapper().writeValueAsString(sspJson);
+ deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+ assertEquals(ssp, deserSSP);
+ }
+
private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException {
// use plain ObjectMapper to get JSON string
String jsonString = new ObjectMapper().writeValueAsString(jobModelJson);
@@ -206,6 +277,7 @@ public class TestSamzaObjectMapper {
containerModel1TaskTestSSPJson.put("system", "foo");
containerModel1TaskTestSSPJson.put("stream", "bar");
containerModel1TaskTestSSPJson.put("partition", 1);
+ containerModel1TaskTestSSPJson.put("keyBucket", -1);
ArrayNode containerModel1TaskTestSSPsJson = objectMapper.createArrayNode();
containerModel1TaskTestSSPsJson.add(containerModel1TaskTestSSPJson);