You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/02/04 19:55:09 UTC

[samza] branch master updated: SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eaa697  SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)
3eaa697 is described below

commit 3eaa697d791ac444edde930750b7c38f1f5a69eb
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Fri Feb 4 11:55:03 2022 -0800

    SAMZA-2688 [Elasticity] introduce elasticity factor config and key bucket within SSP (#1576)
    
    Feature: Elasticity (SAMZA-2687) for a Samza job allows job to have more tasks than the number of input SystemStreamPartition(SSP). Thus, a job can scale up beyond its input partition count without needing the repartition the input stream.
    
    This is achieved by having elastic tasks which is the same as a task for all practical purposes. But an elastic task consumes only a subset of the messages of an SSP.
    With an elasticity factor F (integer), the number of elastic tasks will be F times N with N = original task count.
    The F elastic tasks per original task all consume subsets of same SSP as the original task. There will be F subsets (aka key bucket) per SSP and a message falls into an SSP bucket 'i' if its message.key.hash()%F == i.
    Changes:
    
    introduce the config for enabling elasticity as job.elasticity.factor. If the job without elasticity has N tasks then with factor = F > 1, there will be F times N (elastic) tasks
    Add "key bucket" (an integer ranging 0-F) to SSP which will identify the messages within the SSP
    Compute the key bucket the IncomingMessageEnvelope falls into given elasticity factor F.
    SamzaObjectMapper changes to serde keyBucket component of SSP.
    
Tests: updated unit tests and added new ones.
    

    
API Changes: no public API changes
    
    Upgrade Instructions: N/A
    
    Usage Instructions: set the config job.elasticity.factor > 1 to enable elasticity for the job.
---
 .../samza/system/IncomingMessageEnvelope.java      |  7 +++
 .../apache/samza/system/SystemStreamPartition.java | 33 ++++++++++
 .../java/org/apache/samza/config/JobConfig.java    | 14 +++++
 .../samza/serializers/model/SamzaObjectMapper.java |  7 ++-
 .../org/apache/samza/config/TestJobConfig.java     | 49 +++++++++++++++
 .../serializers/model/TestSamzaObjectMapper.java   | 72 ++++++++++++++++++++++
 6 files changed, 181 insertions(+), 1 deletion(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 7392253..d13d783 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -111,6 +111,13 @@ public class IncomingMessageEnvelope {
     return systemStreamPartition;
   }
 
+  // used for elasticity to determine which elastic task should handle this envelope
+  public SystemStreamPartition getSystemStreamPartition(int elasticityFactor) {
+    Object envelopeKeyorOffset = key != null ? key : offset;
+    int keyBucket = Math.abs(envelopeKeyorOffset.hashCode() % elasticityFactor);
+    return new SystemStreamPartition(systemStreamPartition, keyBucket);
+  }
+
   /**
    * Offset associated with this message, provided by the system consumer that consumed the message.
    */
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index e9ca9f7..4b32891 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -27,6 +27,7 @@ import org.apache.samza.Partition;
 public class SystemStreamPartition extends SystemStream implements Comparable<SystemStreamPartition> {
   protected final Partition partition;
   protected final int hash;  // precomputed as instances are immutable and often stored in hash-addressed data structures
+  protected final int keyBucket; // used for elasticity to determine which elastic task should handle this ssp
 
   /**
    * Constructs a Samza stream partition object from specified components.
@@ -38,6 +39,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
     super(system, stream);
     this.partition = partition;
     this.hash = computeHashCode();
+    this.keyBucket = -1;
   }
 
   /**
@@ -57,6 +59,24 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
     this(other.getSystem(), other.getStream(), partition);
   }
 
+  public SystemStreamPartition(String system, String stream, Partition partition, int keyBucket) {
+    super(system, stream);
+    this.partition = partition;
+    this.hash = computeHashCode();
+    this.keyBucket = keyBucket;
+  }
+
+  public SystemStreamPartition(SystemStreamPartition other, int keyBucket) {
+    super(other.getSystem(), other.getStream());
+    this.partition = other.getPartition();
+    this.hash = computeHashCode();
+    this.keyBucket = keyBucket;
+  }
+
+  public int getKeyBucket() {
+    return keyBucket;
+  }
+
   public Partition getPartition() {
     return partition;
   }
@@ -74,6 +94,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
     final int prime = 31;
     int result = super.hashCode();
     result = prime * result + ((partition == null) ? 0 : partition.hashCode());
+    result = prime * result + ((keyBucket == -1) ? 0 : keyBucket);
     return result;
   }
 
@@ -91,11 +112,17 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
         return false;
     } else if (!partition.equals(other.partition))
       return false;
+    if (keyBucket != other.keyBucket) {
+      return false;
+    }
     return true;
   }
 
   @Override
   public String toString() {
+    if (keyBucket != -1) {
+      return "SystemStreamPartition [" + system + ", " + stream + ", " + partition.getPartitionId() + ", " + keyBucket + "]";
+    }
     return "SystemStreamPartition [" + system + ", " + stream + ", " + partition.getPartitionId() + "]";
   }
 
@@ -118,6 +145,12 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
     } else if (this.partition.compareTo(that.partition) > 0) {
       return 1;
     }
+
+    if (this.keyBucket < that.keyBucket) {
+      return -1;
+    } else if (this.keyBucket > that.keyBucket) {
+      return 1;
+    }
     return 0;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 0b8f249..8a892dc 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -175,6 +175,12 @@ public class JobConfig extends MapConfig {
   public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled";
   private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true;
 
+
+  // Enabled elasticity for the job
+  // number of (elastic) tasks in the job will be old task count X elasticity factor
+  public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor";
+  public static final int DEFAULT_JOB_ELASTICITY_FACTOR = 1;
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -466,4 +472,12 @@ public class JobConfig extends MapConfig {
   public boolean getContainerHeartbeatMonitorEnabled() {
     return getBoolean(CONTAINER_HEARTBEAT_MONITOR_ENABLED, CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT);
   }
+
+  public boolean getElasticityEnabled() {
+    return getElasticityFactor() > 1;
+  }
+
+  public int getElasticityFactor() {
+    return getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
+  }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 7888f1f..b2f16d0 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -246,6 +246,7 @@ public class SamzaObjectMapper {
       systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
       systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
       systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition());
+      systemStreamPartitionMap.put("keyBucket", systemStreamPartition.getKeyBucket());
       jsonGenerator.writeObject(systemStreamPartitionMap);
     }
   }
@@ -258,7 +259,11 @@ public class SamzaObjectMapper {
       String system = node.get("system").textValue();
       String stream = node.get("stream").textValue();
       Partition partition = new Partition(node.get("partition").intValue());
-      return new SystemStreamPartition(system, stream, partition);
+      int keyBucket = -1;
+      if (node.get("keyBucket") != null) {
+        keyBucket = node.get("keyBucket").intValue();
+      }
+      return new SystemStreamPartition(system, stream, partition, keyBucket);
     }
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 2b9d630..6c6dcb1 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -599,4 +599,53 @@ public class TestJobConfig {
     assertFalse(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.CONTAINER_HEARTBEAT_MONITOR_ENABLED,
         "false"))).getContainerHeartbeatMonitorEnabled());
   }
+
+  @Test
+  public void testGetElastictyEnabled() {
+    // greater than 1 means enabled
+    JobConfig jobConfig = new JobConfig(
+        new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(2))));
+    assertTrue(jobConfig.getElasticityEnabled());
+
+    // one means disabled
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(1))));
+    assertFalse(jobConfig.getElasticityEnabled());
+
+    // zero means disabled
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(0))));
+    assertFalse(jobConfig.getElasticityEnabled());
+
+    // negative means disabled
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(-1))));
+    assertFalse(jobConfig.getElasticityEnabled());
+
+    // not specified uses the default standby count, which means disabled
+    jobConfig = new JobConfig(new MapConfig());
+    assertFalse(jobConfig.getElasticityEnabled());
+  }
+
+  @Test
+  public void testGetElasticityFactor() {
+    JobConfig jobConfig = new JobConfig(
+        new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(2))));
+    assertEquals(2, jobConfig.getElasticityFactor());
+
+    jobConfig = new JobConfig(
+        new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(1))));
+    assertEquals(1, jobConfig.getElasticityFactor());
+
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(0))));
+    assertEquals(0, jobConfig.getElasticityFactor());
+
+    jobConfig =
+        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(-1))));
+    assertEquals(-1, jobConfig.getElasticityFactor());
+
+    jobConfig = new JobConfig(new MapConfig());
+    assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
+  }
 }
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 532b674..a02788a 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -187,6 +187,77 @@ public class TestSamzaObjectMapper {
     deserializeFromObjectNode(jobModelJson);
   }
 
+  @Test
+  public void testSerializeSystemStreamPartition() throws IOException {
+    // case 1: keyBucket not explicitly mentioned
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    ObjectNode expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+
+    //Case 2: with non-null keyBucket
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+    assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket"));
+  }
+
+  @Test
+  public void testDeserializeSystemStreamPartition() throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // case 1: keyBucket not explicitly mentioned
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+
+    // case 2: explicitly set key bucket
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+  }
+
   private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException {
     // use plain ObjectMapper to get JSON string
     String jsonString = new ObjectMapper().writeValueAsString(jobModelJson);
@@ -206,6 +277,7 @@ public class TestSamzaObjectMapper {
     containerModel1TaskTestSSPJson.put("system", "foo");
     containerModel1TaskTestSSPJson.put("stream", "bar");
     containerModel1TaskTestSSPJson.put("partition", 1);
+    containerModel1TaskTestSSPJson.put("keyBucket", -1);
 
     ArrayNode containerModel1TaskTestSSPsJson = objectMapper.createArrayNode();
     containerModel1TaskTestSSPsJson.add(containerModel1TaskTestSSPJson);