You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2023/08/04 21:44:17 UTC

[samza] branch master updated: SAMZA-2789: Remove cap on intermediate stream partition count for stream mode (#1679)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ce67acb0 SAMZA-2789: Remove cap on intermediate stream partition count for stream mode (#1679)
3ce67acb0 is described below

commit 3ce67acb0494a47c167f938323917c0bbca32b7d
Author: Bharath Kumarasubramanian <bh...@apache.org>
AuthorDate: Fri Aug 4 14:44:10 2023 -0700

    SAMZA-2789: Remove cap on intermediate stream partition count for stream mode (#1679)
    
    Problem: Intermediate stream partition count inference logic caps the partition size to 256 resulting in imbalances in work assignments to tasks
    
    Description: As part of the intermediate partition size inference logic, we currently employ the following algorithm.
    
    partitionCount = Math.max(maxPartitionSize(inputStreams), maxPartitionSize(outputStreams))
    cap the partitionCount to MAX_INFERRED_PARTITIONS defined in the IntermediateStreamManager which is 256
    apply the inferred partition count to intermediate streams whose partition count is uninitialized
    The logic above always caps the partition size of intermediate streams to 256 for all auto-created intermediate streams. This can prevent the job from scaling up uniformly as the intermediate partition assignment is capped to 256 tasks thereby rendering other tasks imbalanced in case of number tasks > 256.
    
    Changes:
    
    Apply the cap only for batch mode as 256 limit was introduced for batch mode where number of files (partition) could be large
    Add unit tests for IntermediateStreamManager
    Minor java doc fix for DefaultTaskExecutorFactory
    Tests: Added unit tests for the code changes
    
    API Changes: None
    
    Upgrade Instructions:
    
    Jobs that are temporarily worked around this constraint by setting job.intermediate.stream.partitions should remove the configuration in order for samza to infer and apply the partition count as described above
    Jobs that don't use job.intermediate.stream.partitions need no changes.
    Usage Instructions: Refer to upgrade instruction.
---
 .../samza/execution/IntermediateStreamManager.java |  15 ++-
 .../samza/task/DefaultTaskExecutorFactory.java     |   5 +-
 .../samza/execution/TestExecutionPlanner.java      |   6 +-
 .../execution/TestIntermediateStreamManager.java   | 108 +++++++++++++++++++++
 4 files changed, 126 insertions(+), 8 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
index 451d91270..17b5ac5c7 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ApplicationConfig.ApplicationMode;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.slf4j.Logger;
@@ -69,7 +71,8 @@ class IntermediateStreamManager {
   /**
    * Sets partition counts of intermediate streams which have not been assigned partition counts.
    */
-  private void setIntermediateStreamPartitions(JobGraph jobGraph) {
+  @VisibleForTesting
+  void setIntermediateStreamPartitions(JobGraph jobGraph) {
     final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS;
     int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
     if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
@@ -81,11 +84,14 @@ class IntermediateStreamManager {
       int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
 
-      if (partitions > MAX_INFERRED_PARTITIONS) {
+      ApplicationMode applicationMode = getAppMode();
+      if (partitions > MAX_INFERRED_PARTITIONS && ApplicationMode.BATCH.equals(applicationMode)) {
         partitions = MAX_INFERRED_PARTITIONS;
         log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
             partitions, MAX_INFERRED_PARTITIONS));
       }
+
+      log.info("Using {} as the default partition count for intermediate streams", partitions);
     } else {
       // Reject any zero or other negative values explicitly specified in config.
       if (partitions <= 0) {
@@ -105,6 +111,11 @@ class IntermediateStreamManager {
     }
   }
 
+  @VisibleForTesting
+  ApplicationMode getAppMode() {
+    return new ApplicationConfig(config).getAppMode();
+  }
+
   /**
    * Sets partition counts of intermediate streams participating in joins operations.
    */
diff --git a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
index ec975cf68..5ed5165c1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
@@ -50,9 +50,8 @@ public class DefaultTaskExecutorFactory implements TaskExecutorFactory {
    * {@inheritDoc}
    *
    * The choice of thread pool is determined based on the following logic
-   *    If job.operator.thread.pool.enabled,
-   *     a. Use {@link #getTaskExecutor(Config)} if job.container.thread.pool.size &gt; 1
-   *     b. Use default single threaded pool otherwise
+   *     1. Use {@link #getTaskExecutor(Config)} if job.container.thread.pool.size &gt; 1
+   *     2. Use default single threaded pool otherwise
    * <b>Note:</b> The default single threaded pool used is a substitute for the scenario where container thread pool is null and
    * the messages are dispatched on runloop thread. We can't have the stages schedule on the run loop thread and hence
    * the fallback to use a single threaded executor across all tasks.
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index c25c265b1..f537b7152 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -654,8 +654,8 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testMaxPartitionLimit() {
-    int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS;
+  public void testMaxPartitionLimitIsIgnoredForStreamingMode() {
+    int expectedPartitionSize = IntermediateStreamManager.MAX_INFERRED_PARTITIONS * 2;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
@@ -668,7 +668,7 @@ public class TestExecutionPlanner {
 
     // Partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
-      assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1
+      assertEquals(expectedPartitionSize, edge.getPartitionCount()); // max of input1 and output1
     });
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
new file mode 100644
index 000000000..f353beb6e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.config.ApplicationConfig.ApplicationMode;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import org.mockito.MockitoAnnotations;
+import static org.mockito.Mockito.*;
+
+public class TestIntermediateStreamManager {
+
+  @Mock
+  private Config config;
+  @Mock
+  private JobGraph jobGraph;
+
+  @Mock
+  private StreamEdge intermediateStream;
+
+  private IntermediateStreamManager streamManager;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS, StreamEdge.PARTITIONS_UNKNOWN))
+        .thenReturn(-1);
+    buildInputStreams();
+    buildOutputStreams();
+    buildIntermediateStream();
+    streamManager = new IntermediateStreamManager(config);
+  }
+
+  @Test
+  public void setIntermediateStreamPartitions() {
+    streamManager.setIntermediateStreamPartitions(jobGraph);
+    verify(intermediateStream).setPartitionCount(1024);
+  }
+
+  @Test
+  public void setIntermediateStreamPartitionsForBatchMode() {
+    streamManager = spy(new IntermediateStreamManager(config));
+    doReturn(ApplicationMode.BATCH).when(streamManager).getAppMode();
+    streamManager.setIntermediateStreamPartitions(jobGraph);
+    verify(intermediateStream).setPartitionCount(256);
+  }
+
+  @Test
+  public void setIntermediateStreamPartitionsWithDefaultConfigPropertySet() {
+    when(config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS, StreamEdge.PARTITIONS_UNKNOWN))
+        .thenReturn(128);
+    streamManager.setIntermediateStreamPartitions(jobGraph);
+    verify(intermediateStream).setPartitionCount(128);
+
+  }
+
+  private void buildInputStreams() {
+    StreamEdge input1 = mock(StreamEdge.class);
+    when(input1.getPartitionCount()).thenReturn(64);
+    when(input1.isIntermediate()).thenReturn(false);
+    StreamEdge input2 = mock(StreamEdge.class);
+    when(input2.getPartitionCount()).thenReturn(1024);
+    when(input2.isIntermediate()).thenReturn(false);
+
+    when(jobGraph.getInputStreams())
+        .thenReturn(ImmutableSet.of(input1, input2));
+  }
+
+  private void buildIntermediateStream() {
+    when(intermediateStream.isIntermediate()).thenReturn(true);
+    when(intermediateStream.getPartitionCount()).thenReturn(0);
+    when(jobGraph.getIntermediateStreamEdges())
+        .thenReturn(ImmutableSet.of(intermediateStream));
+  }
+
+  private void buildOutputStreams() {
+    StreamEdge output1 = mock(StreamEdge.class);
+    when(output1.getPartitionCount()).thenReturn(64);
+    when(output1.isIntermediate()).thenReturn(false);
+    StreamEdge output2 = mock(StreamEdge.class);
+    when(output2.getPartitionCount()).thenReturn(256);
+    when(output2.isIntermediate()).thenReturn(false);
+
+    when(jobGraph.getOutputStreams())
+        .thenReturn(ImmutableSet.of(output1, output2));
+  }
+}