You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2020/09/11 00:24:34 UTC

[samza] branch master updated: SAMZA-2558: Refactor app.runner.class

This is an automated email from the ASF dual-hosted git repository.

nickpan47 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fcced3  SAMZA-2558: Refactor app.runner.class
3fcced3 is described below

commit 3fcced3fc2c7b9b344958fe6a1a59ba65aa88d69
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Thu Sep 10 17:24:14 2020 -0700

    SAMZA-2558: Refactor app.runner.class
    
    Issues: app.runner.class config is hard coded across multple places in Samza.
    
    Changes:
    1. Move ApplicationRunners.java from samza-api to samza-core module while the package name stays.
    2. Add app.runner.class and getAppRunnerClass method to ApplicationConfig.
    3. Update occurrence of app.runner.class to ApplicationConfig.APP_RUNNER_CLASS.
    
    API Changes:
    Usage of ApplicationRunners need to depend on samza-core module instead of samza-api.
    Upgrade Instructions:
    Depend on samza-core instead of samza-api module.
    Usage Instructions:
    N/A
    
    Author: Ke Wu <kw...@linkedin.com>
    
    Reviewers: Yi Pan <ni...@gmail.com>
    
    Closes #1400 from kw2542/SAMZA-2558
---
 .../org/apache/samza/config/ApplicationConfig.java     |  6 ++++++
 .../org/apache/samza/runtime/ApplicationRunners.java   | 18 +++++-------------
 .../samza/runtime/TestApplicationRunnerMain.java       | 10 +++++-----
 .../apache/samza/runtime/TestApplicationRunners.java   |  3 ++-
 .../samza/sql/runner/SamzaSqlApplicationRunner.java    |  6 +++---
 .../sql/runner/TestSamzaSqlApplicationRunner.java      |  7 ++++---
 .../controlmessages/EndOfStreamIntegrationTest.java    |  3 ++-
 .../test/controlmessages/WatermarkIntegrationTest.java |  3 ++-
 .../StreamApplicationIntegrationTestHarness.java       |  7 ++++---
 .../test/processor/TestZkLocalApplicationRunner.java   |  2 +-
 .../apache/samza/test/startpoint/TestStartpoint.java   |  2 +-
 .../tools/benchmark/SystemConsumerWithSamzaBench.java  |  2 +-
 12 files changed, 36 insertions(+), 33 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 7fda7e1..ea2e943 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -59,6 +59,9 @@ public class ApplicationConfig extends MapConfig {
   public static final String APP_RUN_ID = "app.run.id";
   public static final String APP_MAIN_CLASS = "app.main.class";
   public static final String APP_MAIN_ARGS = "app.main.args";
+  public static final String APP_RUNNER_CLASS = "app.runner.class";
+
+  private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
 
   public ApplicationConfig(Config config) {
     super(config);
@@ -108,4 +111,7 @@ public class ApplicationConfig extends MapConfig {
     return Optional.ofNullable(get(APP_MAIN_CLASS));
   }
 
+  public String getAppRunnerClass() {
+    return get(APP_RUNNER_CLASS, DEFAULT_APP_RUNNER);
+  }
 }
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
similarity index 84%
rename from samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
rename to samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
index cd1d06b..974c494 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
@@ -20,6 +20,7 @@ package org.apache.samza.runtime;
 
 import java.lang.reflect.Constructor;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 
@@ -40,14 +41,7 @@ import org.apache.samza.config.ConfigException;
  *   }
  * }</pre>
  */
-public class ApplicationRunners {
-
-  private static final String APP_RUNNER_CFG = "app.runner.class";
-  private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
-
-  private ApplicationRunners() {
-
-  }
+public final class ApplicationRunners {
 
   /**
    * Get the {@link ApplicationRunner} that runs the {@code userApp}
@@ -56,8 +50,8 @@ public class ApplicationRunners {
    * @param config the configuration for this application
    * @return the {@link ApplicationRunner} object that will run the {@code userApp}
    */
-  public static final ApplicationRunner getApplicationRunner(SamzaApplication userApp, Config config) {
-    String appRunnerClassName = getAppRunnerClass(config);
+  public static ApplicationRunner getApplicationRunner(SamzaApplication<?> userApp, Config config) {
+    String appRunnerClassName = new ApplicationConfig(config).getAppRunnerClass();
     try {
       Class<?> runnerClass = Class.forName(appRunnerClassName);
       if (!ApplicationRunner.class.isAssignableFrom(runnerClass)) {
@@ -75,8 +69,6 @@ public class ApplicationRunners {
     }
   }
 
-  private static String getAppRunnerClass(Config config) {
-    return config.getOrDefault(APP_RUNNER_CFG, DEFAULT_APP_RUNNER);
+  private ApplicationRunners() {
   }
-
 }
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 6176c5f..28656db 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -44,7 +44,7 @@ public class TestApplicationRunnerMain {
         "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
         "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
-        "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+        "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
     });
 
     assertEquals(1, TestApplicationRunnerInvocationCounts.runCount);
@@ -57,7 +57,7 @@ public class TestApplicationRunnerMain {
         "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
         "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
-        "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+        "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=kill"
     });
 
@@ -71,7 +71,7 @@ public class TestApplicationRunnerMain {
         "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
         "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
-        "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+        "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=status"
     });
 
@@ -85,7 +85,7 @@ public class TestApplicationRunnerMain {
         "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
         "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
-        "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
+        "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()));
 
     Config actual = cmdLine.loadConfig(options);
 
@@ -93,7 +93,7 @@ public class TestApplicationRunnerMain {
         JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
         ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath(),
         ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName(),
-        "app.runner.class", TestApplicationRunnerInvocationCounts.class.getName()));
+        ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()));
 
     assertEquals(expected, actual);
   }
diff --git a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
similarity index 94%
rename from samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
rename to samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
index 139ed69..1f0f284 100644
--- a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
@@ -41,7 +42,7 @@ public class TestApplicationRunners {
   @Test
   public void testGetAppRunner() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put("app.runner.class", MockApplicationRunner.class.getName());
+    configMap.put(ApplicationConfig.APP_RUNNER_CLASS, MockApplicationRunner.class.getName());
     Config config = new MapConfig(configMap);
     StreamApplication app = mock(StreamApplication.class);
     ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(app, config);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 23d7be5..b1b9001 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.Validate;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
@@ -54,7 +55,6 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
 
   private final ApplicationRunner runner;
 
-  public static final String RUNNER_CONFIG = "app.runner.class";
   public static final String CFG_FMT_SAMZA_STREAM_SYSTEM = "streams.%s.samza.system";
 
   /**
@@ -110,9 +110,9 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
     newConfig.putAll(config);
 
     if (localRunner) {
-      newConfig.put(RUNNER_CONFIG, LocalApplicationRunner.class.getName());
+      newConfig.put(ApplicationConfig.APP_RUNNER_CLASS, LocalApplicationRunner.class.getName());
     } else {
-      newConfig.put(RUNNER_CONFIG, RemoteApplicationRunner.class.getName());
+      newConfig.put(ApplicationConfig.APP_RUNNER_CLASS, RemoteApplicationRunner.class.getName());
     }
 
     LOG.info("New Samza configs: " + newConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 272ca4a..2dda4fe 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.runner;
 
 import java.util.Map;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.runtime.LocalApplicationRunner;
@@ -36,15 +37,15 @@ public class TestSamzaSqlApplicationRunner {
     Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     String sql1 = "Insert into testavro.outputTopic(id,long_value) select id, MyTest(id) as long_value from testavro.SIMPLE1";
     configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
-    configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName());
+    configs.put(ApplicationConfig.APP_RUNNER_CLASS, SamzaSqlApplicationRunner.class.getName());
     MapConfig samzaConfig = new MapConfig(configs);
     Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig);
-    Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName());
+    Assert.assertEquals(newConfigs.get(ApplicationConfig.APP_RUNNER_CLASS), LocalApplicationRunner.class.getName());
     // Check whether five new configs added.
     Assert.assertEquals(newConfigs.size(), configs.size() + 5);
 
     newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig);
-    Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName());
+    Assert.assertEquals(newConfigs.get(ApplicationConfig.APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName());
 
     // Check whether five new configs added.
     Assert.assertEquals(newConfigs.size(), configs.size() + 5);
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index b0630b2..e53f701 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -72,7 +73,7 @@ public class EndOfStreamIntegrationTest extends IntegrationTestHarness {
 
     int partitionCount = 4;
     Map<String, String> configs = new HashMap<>();
-    configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+    configs.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
     configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews));
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 25d31ea..849f64c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.controlmessages;
 
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.TaskConfig;
 import scala.collection.JavaConverters;
 
@@ -123,7 +124,7 @@ public class WatermarkIntegrationTest extends IntegrationTestHarness {
   @Test
   public void testWatermark() throws Exception {
     Map<String, String> configs = new HashMap<>();
-    configs.put("app.runner.class", MockLocalApplicationRunner.class.getName());
+    configs.put(ApplicationConfig.APP_RUNNER_CLASS, MockLocalApplicationRunner.class.getName());
     configs.put("systems.test.samza.factory", TestSystemFactory.class.getName());
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 2bcc661..0d1e49a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
@@ -176,9 +177,9 @@ public class StreamApplicationIntegrationTestHarness extends IntegrationTestHarn
       String appName,
       Map<String, String> overriddenConfigs) {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
-    configMap.put("app.name", appName);
-    configMap.put("app.class", streamApplication.getClass().getCanonicalName());
+    configMap.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
+    configMap.put(ApplicationConfig.APP_NAME, appName);
+    configMap.put(ApplicationConfig.APP_CLASS, streamApplication.getClass().getCanonicalName());
     configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory");
     configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
     configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index fef4836..4e69372 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -220,7 +220,7 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
     config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
     config.put(ApplicationConfig.APP_NAME, appName);
     config.put(ApplicationConfig.APP_ID, appId);
-    config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+    config.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
     config.put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
     config.put(JobConfig.JOB_NAME, appName);
     config.put(JobConfig.JOB_ID, appId);
diff --git a/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java b/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
index d71028d..876690a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
+++ b/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
@@ -449,7 +449,7 @@ public class TestStartpoint extends IntegrationTestHarness {
     config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
     config.put(ApplicationConfig.APP_NAME, appName);
     config.put(ApplicationConfig.APP_ID, appId);
-    config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+    config.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
     config.put(String.format("systems.%s.samza.factory", TEST_SYSTEM), TEST_SYSTEM_FACTORY);
     config.put(JobConfig.JOB_NAME, appName);
     config.put(JobConfig.JOB_ID, appId);
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index bdb8fce..5b6a274 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -63,7 +63,7 @@ public class SystemConsumerWithSamzaBench extends AbstractSamzaBench {
 
   @Override
   public void addMoreSystemConfigs(Properties props) {
-    props.put("app.runner.class", LocalApplicationRunner.class.getName());
+    props.put(ApplicationConfig.APP_RUNNER_CLASS, LocalApplicationRunner.class.getName());
     List<Integer> partitions = IntStream.range(startPartition, endPartition).boxed().collect(Collectors.toList());
     props.put(ApplicationConfig.APP_NAME, "SamzaBench");
     props.put(JobConfig.PROCESSOR_ID, "1");