You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/04/20 18:49:31 UTC

[samza] branch master updated: SAMZA-2735: Support configurable command to run jc in Yarn (#1600)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new cb63b275d SAMZA-2735: Support configurable command to run jc in Yarn (#1600)
cb63b275d is described below

commit cb63b275dfe209f6b2a59509d90d32fa8eb1e826
Author: Xinyu Liu <xi...@gmail.com>
AuthorDate: Wed Apr 20 11:49:25 2022 -0700

    SAMZA-2735: Support configurable command to run jc in Yarn (#1600)
    
    Co-authored-by: Xinyu Liu <xi...@xiliu-mn1.linkedin.biz>
---
 .../documentation/versioned/jobs/samza-configurations.md      |  7 ++++---
 .../src/main/java/org/apache/samza/config/JobConfig.java      |  7 +++++++
 .../src/test/java/org/apache/samza/config/TestJobConfig.java  | 11 +++++++++++
 .../src/main/scala/org/apache/samza/job/yarn/YarnJob.scala    |  2 +-
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1ef74cdf5..790e289ec 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -67,15 +67,16 @@ These are the basic properties for setting up a Samza application.
 |--- |--- |--- |
 |job.changelog.system|inherited from job.default.system|This property is required if you would like to override the system defined in `job.default.system` for the changelog. The changelog will be used with the stream specified in `stores.store-name.changelog` config. You can override this system by specifying both the system and the stream in `stores.store-name.changelog`.|
 |job.coordinator.system|inherited from job.default.system|This property is required if you would like to override the system defined in `job.default.system` for coordination. The **_system-name_** to use for creating and maintaining the Coordinator Stream.|
+|job.coordinator.segment.<br>bytes|26214400|If you are using a Kafka system for coordinator stream, this is the segment size to be used for the coordinator topic's log segments. Keeping this number small is useful because it increases the frequency that Kafka will garbage collect old messages.|
+|job.coordinator.replication.<br>factor|2|If you are using a Kafka system for coordinator stream, this is the replication factor to be used for the coordinator topic.|
+|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The frequency at which the input streams' partition count change should be detected. When the input partition count change is detected, Samza will automatically restart a stateless job or fail a stateful job. A longer time interval is recommended for jobs w/ large number of input system stream partitions, since gathering partition count may incur measurable overhead to the job. You can completely disable partition coun [...]
+|job.coordinator.execute|bin/run-jc.sh|The command that starts a Samza job coordinator. The script must be included in the job package. There is usually no need to customize this.|
 |job.config.rewriter.<br>**_rewriter-name_**.class|(none)|You can optionally define configuration rewriters, which have the opportunity to dynamically modify the job configuration before the job is started. For example, this can be useful for pulling configuration from an external configuration management system, or for determining the set of input streams dynamically at runtime. The value of this property is a fully-qualified Java classname which must implement [ConfigRewriter](../api/j [...]
 |job.config.rewriters|(none)|If you have defined configuration rewriters, you need to list them here, in the order in which they should be applied. The value of this property is a comma-separated list of **_rewriter-name_** tokens.|
 |job.config.rewriter.<br>**_rewriter-name_**.system|(none)|Set this property to the `system-name` of the Kafka system from which you want to consume all matching topics.|
 |job.config.rewriter.<br>**_rewriter-name_**.regex|(none)|A regular expression specifying which topics you want to consume within the Kafka system `job.config.rewriter.*.system`. Any topics matched by this regular expression will be consumed in addition to any topics you specify in your application.|
 |job.config.rewriter.<br>**_rewriter-name_**.config.*| |Any properties specified within this namespace are applied to the configuration of streams that match the regex in `job.config.rewriter.*.regex`. For example, you can set `job.config.rewriter.*.config.samza.msg.serde` to configure the deserializer for messages in the matching streams, which is equivalent to setting `systems.*.streams.*.samza.msg.serde` for each topic that matches the regex.|
 |job.container.thread.<br>pool.size|0|If configured, the container thread pool will be used to run synchronous operations of each task [in parallel](#../container/event-loop.html). The operations include StreamTask.process(), WindowableTask.window(), and internally Task.commit(). If not configured and the default value of 0 is used, all task operations will run in a single thread.|
-|job.coordinator.<br>monitor-partition-change.<br>frequency.ms|300000|The frequency at which the input streams' partition count change should be detected. When the input partition count change is detected, Samza will automatically restart a stateless job or fail a stateful job. A longer time interval is recommended for jobs w/ large number of input system stream partitions, since gathering partition count may incur measurable overhead to the job. You can completely disable partition coun [...]
-|job.coordinator.segment.<br>bytes|26214400|	If you are using a Kafka system for coordinator stream, this is the segment size to be used for the coordinator topic's log segments. Keeping this number small is useful because it increases the frequency that Kafka will garbage collect old messages.|
-|job.coordinator.replication.<br>factor|300000|The frequency at which the input streams' partition count change should be detected. When the input partition count change is detected, Samza will automatically restart a stateless job or fail a stateful job. A longer time interval is recommended for jobs w/ large number of input system stream partitions, since gathering partition count may incur measurable overhead to the job. You can completely disable partition count monitoring by setting [...]
 |job.systemstreampartition.<br>grouper.factory|`org.apache.samza.`<br>`container.grouper.stream.`<br>`GroupByPartitionFactory`|A factory class that is used to determine how input SystemStreamPartitions are grouped together for processing in individual StreamTask instances. The factory must implement the SystemStreamPartitionGrouperFactory interface. Once this configuration is set, it can't be changed, since doing so could violate state semantics, and lead to a loss of data.<br><br>`org.a [...]
 |job.systemstreampartition.<br>matcher.class| |If you want to enable static partition assignment, then this is a required configuration. The value of this property is a fully-qualified Java class name that implements the interface org.apache.samza.system.SystemStreamPartitionMatcher. Samza ships with two matcher classes:<br><br>`org.apache.samza.system.RangeSystemStreamPartitionMatcher`<br>This classes uses a comma separated list of range(s) to determine which partition matches, and thus [...]
 |job.systemstreampartition.<br>matcher.config.<br>range| |If `job.systemstreampartition.matcher.class` is specified, and the value of this property is `org.apache.samza.system.RangeSystemStreamPartitionMatcher`, then this property is a required configuration. Specify a comma separated list of range(s) to determine which partition matches, and thus statically assigned to the Job. For example "2,3,11-20", statically assigns partition 2, 3, and 11 to 20 for all the specified system and stre [...]
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 638474715..e3658ce03 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -111,6 +111,9 @@ public class JobConfig extends MapConfig {
   public static final String MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms";
   static final int DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000;
 
+  public static final String COORDINATOR_EXECUTE_COMMAND = "job.coordinator.execute";
+  static final String DEFAULT_COORDINATOR_EXECUTE_COMMAND = "bin/run-jc.sh";
+
   public static final String REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex";
   public static final String REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system";
   public static final String REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config";
@@ -490,4 +493,8 @@ public class JobConfig extends MapConfig {
     }
     return elasticityFactor;
   }
+
+  public String getCoordinatorExecuteCommand() {
+    return get(COORDINATOR_EXECUTE_COMMAND, DEFAULT_COORDINATOR_EXECUTE_COMMAND);
+  }
 }
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 4d171662c..a037da41b 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -682,4 +682,15 @@ public class TestJobConfig {
     jobConfig = new JobConfig(new MapConfig());
     assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
   }
+
+  @Test
+  public void testGetCoordinatorExecuteCommand() {
+    JobConfig jobConfig = new JobConfig(new MapConfig());
+    assertEquals(JobConfig.DEFAULT_COORDINATOR_EXECUTE_COMMAND, jobConfig.getCoordinatorExecuteCommand());
+
+    String myJcCmd = "bin/run-my-jc.sh";
+    jobConfig = new JobConfig(new MapConfig(
+        Collections.singletonMap(JobConfig.COORDINATOR_EXECUTE_COMMAND, myJcCmd)));
+    assertEquals(myJcCmd, jobConfig.getCoordinatorExecuteCommand());
+  }
 }
\ No newline at end of file
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 237667d07..16ff8a465 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -43,7 +43,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob wit
   def submit: YarnJob = {
     try {
       val jobConfig = new JobConfig(config)
-      val cmdExec = "./__package/bin/run-jc.sh"
+      val cmdExec = "./__package/" + jobConfig.getCoordinatorExecuteCommand
       val environment = YarnJob.buildEnvironment(config, this.yarnConfig, jobConfig)
 
       appId = client.submitApplication(