You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/25 21:52:55 UTC

samza git commit: SAMZA-1417: Clear and recreate intermediate and metadata streams for batch processing

Repository: samza
Updated Branches:
  refs/heads/0.14.0 c45c7747a -> 475414884


SAMZA-1417: Clear and recreate intermediate and metadata streams for batch processing

For each run of a batch application, we need to clear the internal streams from the previous run and recreate new ones. This patch introduces the following:
1) bounded flag in StreamSpec
2) app.mode (BATCH/STREAM) in the application config. An application is a batch app iff all the input streams are bounded.
3) app.runId and use it to generate the internal topics for each run.

run.id generation is not addressed in this pr. There will be another patch to resolve it for both yarn and standalone. For now, this patch only works for yarn.

Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jake Maes <jm...@apache.org>

Closes #297 from xinyuiscool/SAMZA-1417


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/47541488
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/47541488
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/47541488

Branch: refs/heads/0.14.0
Commit: 475414884ef386af328ed7a07d59f6986713c283
Parents: c45c774
Author: Xinyu Liu <xi...@gmail.com>
Authored: Mon Sep 25 14:52:42 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Sep 25 14:52:42 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  32 ++++
 .../samza/checkpoint/CheckpointManager.java     |   4 +
 .../org/apache/samza/system/StreamSpec.java     |  37 ++++-
 .../apache/samza/config/ApplicationConfig.java  |  15 ++
 .../apache/samza/config/JavaStorageConfig.java  |   3 +
 .../apache/samza/execution/ExecutionPlan.java   |   7 +
 .../samza/execution/ExecutionPlanner.java       |  15 ++
 .../org/apache/samza/execution/JobGraph.java    |   2 +-
 .../org/apache/samza/execution/JobNode.java     |  16 +-
 .../org/apache/samza/execution/StreamEdge.java  |  47 +++++-
 .../apache/samza/execution/StreamManager.java   |  76 +++++++++
 .../runtime/AbstractApplicationRunner.java      |  42 ++++-
 .../samza/runtime/RemoteApplicationRunner.java  |  27 ++-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |   1 -
 .../org/apache/samza/config/StreamConfig.scala  |  11 +-
 .../samza/coordinator/JobModelManager.scala     |   2 +-
 .../samza/execution/TestExecutionPlanner.java   |   6 +-
 .../apache/samza/execution/TestStreamEdge.java  |  84 ++++++++++
 .../samza/execution/TestStreamManager.java      | 147 +++++++++++++++++
 .../runtime/TestLocalApplicationRunner.java     | 164 +++++--------------
 .../samza/system/kafka/KafkaStreamSpec.java     |   2 +-
 .../kafka/KafkaCheckpointManager.scala          |  19 ++-
 .../kafka/KafkaCheckpointManagerFactory.scala   |  14 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |   2 +
 .../scala/org/apache/samza/util/KafkaUtil.scala |  12 +-
 .../samza/system/kafka/TestKafkaStreamSpec.java |   2 +-
 26 files changed, 605 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 9b4e279..f4d87f6 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -116,6 +116,38 @@
         <table>
             <tbody>
                 <tr><th>Name</th><th>Default</th><th>Description</th></tr>
+
+                <tr>
+                    <th colspan="3" class="section" id="application">Samza application configuration</th>
+                </tr>
+                <tr>
+                    <td class="property" id="app-name">app.name</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required:</strong> The name of your application.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="app-id">app.id</td>
+                    <td class="default">1</td>
+                    <td class="description">
+                        If you run several instances of your application at the same time, you need to give each instance a
+                        different <code>app.id</code>. This is important, since otherwise the applications will overwrite each
+                        others' checkpoints, and perhaps interfere with each other in other ways.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="app-class">app.class</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required:</strong> The application to run. The value is a fully-qualified Java classname,
+                        which must implement <a href="../api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>.
+                        A StreamApplication describes as a series of transformations on the streams.
+                    </td>
+                </tr>
+
                 <tr>
                     <th colspan="3" class="section" id="job"><a href="configuration.html">Samza job configuration</a></th>
                 </tr>

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index dc14beb..10f166c 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -50,4 +50,8 @@ public interface CheckpointManager {
 
   void stop();
 
+  /**
+   * Clear the checkpoints in the checkpoint stream.
+   */
+  default void clearCheckpoints() { };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 384fecc..8d7401a 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -43,6 +43,9 @@ public class StreamSpec {
   // Internal coordinator stream id. It is used for creating coordinator StreamSpec.
   private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id";
 
+  // Internal checkpoint stream id. It is used for creating checkpoint StreamSpec.
+  private static final String CHECKPOINT_STREAM_ID = "samza-internal-checkpoint-stream-id";
+
   /**
    * Unique identifier for the stream in a Samza application.
    * This identifier is used as a key for stream properties in the
@@ -69,6 +72,11 @@ public class StreamSpec {
   private final int partitionCount;
 
   /**
+   * Bounded or unbounded stream
+   */
+  private final boolean isBounded;
+
+  /**
    * A set of all system-specific configurations for the stream.
    */
   private final Map<String, String> config;
@@ -86,7 +94,7 @@ public class StreamSpec {
    *                      Samza System abstraction. See {@link SystemFactory}
    */
   public StreamSpec(String id, String physicalName, String systemName) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap());
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, false, Collections.emptyMap());
   }
 
   /**
@@ -105,7 +113,7 @@ public class StreamSpec {
    * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
    */
   public StreamSpec(String id, String physicalName, String systemName, int partitionCount) {
-    this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
+    this(id, physicalName, systemName, partitionCount, false, Collections.emptyMap());
   }
 
   /**
@@ -120,10 +128,12 @@ public class StreamSpec {
    * @param systemName    The System name on which this stream will exist. Corresponds to a named implementation of the
    *                      Samza System abstraction. See {@link SystemFactory}
    *
+   * @param isBounded     The stream is bounded or not.
+   *
    * @param config        A map of properties for the stream. These may be System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
+  public StreamSpec(String id, String physicalName, String systemName, boolean isBounded, Map<String, String> config) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, isBounded, config);
   }
 
   /**
@@ -140,9 +150,11 @@ public class StreamSpec {
    *
    * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
    *
+   * @param isBounded       The stream is bounded or not.
+   *
    * @param config          A map of properties for the stream. These may be System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, int partitionCount,  Map<String, String> config) {
+  public StreamSpec(String id, String physicalName, String systemName, int partitionCount, boolean isBounded, Map<String, String> config) {
     validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
@@ -154,6 +166,7 @@ public class StreamSpec {
     this.systemName = systemName;
     this.physicalName = physicalName;
     this.partitionCount = partitionCount;
+    this.isBounded = isBounded;
 
     if (config != null) {
       this.config = Collections.unmodifiableMap(new HashMap<>(config));
@@ -171,7 +184,11 @@ public class StreamSpec {
    * @return                A copy of this StreamSpec with the specified partitionCount.
    */
   public StreamSpec copyWithPartitionCount(int partitionCount) {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, config);
+    return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, config);
+  }
+
+  public StreamSpec copyWithPhysicalName(String physicalName) {
+    return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, config);
   }
 
   public String getId() {
@@ -214,6 +231,10 @@ public class StreamSpec {
     return id.equals(COORDINATOR_STREAM_ID);
   }
 
+  public boolean isBounded() {
+    return isBounded;
+  }
+
   private void validateLogicalIdentifier(String identifierName, String identifierValue) {
     if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
       throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
@@ -242,4 +263,8 @@ public class StreamSpec {
   public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) {
     return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1);
   }
+
+  public static StreamSpec createCheckpointStreamSpec(String physicalName, String systemName) {
+    return new StreamSpec(CHECKPOINT_STREAM_ID, physicalName, systemName, 1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 1b53321..39facb6 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -40,6 +40,11 @@ public class ApplicationConfig extends MapConfig {
   @Deprecated
   public static final String PROCESSOR_ID = "processor.id";
 
+  public enum ApplicationMode {
+    STREAM,
+    BATCH
+  }
+
   /**
    * Class implementing the {@link org.apache.samza.runtime.ProcessorIdGenerator} interface
    * Used to generate a unique identifier for a {@link org.apache.samza.processor.StreamProcessor} based on the runtime
@@ -49,6 +54,8 @@ public class ApplicationConfig extends MapConfig {
   public static final String APP_NAME = "app.name";
   public static final String APP_ID = "app.id";
   public static final String APP_CLASS = "app.class";
+  public static final String APP_MODE = "app.mode";
+  public static final String APP_RUN_ID = "app.run.id";
 
   public ApplicationConfig(Config config) {
     super(config);
@@ -83,4 +90,12 @@ public class ApplicationConfig extends MapConfig {
     return get(PROCESSOR_ID, null);
   }
 
+  public String getRunId() {
+    return get(APP_RUN_ID, null);
+  }
+
+  public ApplicationMode getAppMode() {
+    return ApplicationMode.valueOf(get(APP_MODE, ApplicationMode.STREAM.name()).toUpperCase());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index c26601c..4e9a58a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -22,6 +22,7 @@ package org.apache.samza.config;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.samza.SamzaException;
+import org.apache.samza.execution.StreamManager;
 
 
 /**
@@ -71,6 +72,8 @@ public class JavaStorageConfig extends MapConfig {
     } else {
       systemStreamRes = systemStream;
     }
+
+    systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this);
     return systemStreamRes;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
index bde9bfb..3eb2e96 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
@@ -21,6 +21,7 @@ package org.apache.samza.execution;
 
 import java.util.List;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.system.StreamSpec;
 
@@ -39,6 +40,12 @@ public interface ExecutionPlan {
   List<JobConfig> getJobConfigs();
 
   /**
+   * Returns the config for this application
+   * @return {@link ApplicationConfig}
+   */
+  ApplicationConfig getApplicationConfig();
+
+  /**
    * Returns the intermediate streams that need to be created.
    * @return intermediate {@link StreamSpec}s
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 00f4ad4..e258d13 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -28,7 +28,10 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -56,6 +59,8 @@ public class ExecutionPlanner {
   }
 
   public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
+    validateConfig();
+
     // create physical job graph based on stream graph
     JobGraph jobGraph = createJobGraph(streamGraph);
 
@@ -70,6 +75,16 @@ public class ExecutionPlanner {
     return jobGraph;
   }
 
+  private void validateConfig() {
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    ClusterManagerConfig clusterConfig = new ClusterManagerConfig(config);
+    // currently we don't support host-affinity in batch mode
+    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH
+        && clusterConfig.getHostAffinityEnabled()) {
+      throw new SamzaException("Host affinity is not supported in batch mode. Please configure job.host-affinity.enabled=false.");
+    }
+  }
+
   /**
    * Create the physical graph from StreamGraph
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 99ee86c..2a09e90 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -173,7 +173,7 @@ import org.slf4j.LoggerFactory;
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
-      edge = new StreamEdge(streamSpec, isIntermediate);
+      edge = new StreamEdge(streamSpec, isIntermediate, config);
       edges.put(streamId, edge);
     }
     return edge;

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 88b24ba..a86e019 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -29,14 +29,12 @@ import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +123,7 @@ public class JobNode {
     configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
 
     // write input/output streams to configs
-    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs));
+    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig()));
 
     log.info("Job {} has generated configs {}", jobName, configs);
 
@@ -190,18 +188,6 @@ public class JobNode {
     return scopedConfig;
   }
 
-  private static void addStreamConfig(StreamEdge edge, Map<String, String> config) {
-    StreamSpec spec = edge.getStreamSpec();
-    config.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
-    config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
-    if (edge.isIntermediate()) {
-      config.put(String.format(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID(), spec.getId()), "true");
-    }
-    spec.getConfig().forEach((property, value) -> {
-        config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
-      });
-  }
-
   static String createId(String jobName, String jobId) {
     return String.format("%s-%s", jobName, jobId);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 35fde81..f545490 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -20,7 +20,13 @@
 package org.apache.samza.execution;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Util;
@@ -37,19 +43,21 @@ public class StreamEdge {
   private final StreamSpec streamSpec;
   private final List<JobNode> sourceNodes = new ArrayList<>();
   private final List<JobNode> targetNodes = new ArrayList<>();
+  private final Config config;
 
   private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
   private final boolean isIntermediate;
 
-  StreamEdge(StreamSpec streamSpec) {
-    this(streamSpec, false);
+  StreamEdge(StreamSpec streamSpec, Config config) {
+    this(streamSpec, false, config);
   }
 
-  StreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
+  StreamEdge(StreamSpec streamSpec, boolean isIntermediate, Config config) {
     this.streamSpec = streamSpec;
     this.name = Util.getNameFromSystemStream(getSystemStream());
     this.isIntermediate = isIntermediate;
+    this.config = config;
   }
 
   void addSourceNode(JobNode sourceNode) {
@@ -60,12 +68,17 @@ public class StreamEdge {
     targetNodes.add(targetNode);
   }
 
-  public StreamSpec getStreamSpec() {
-    if (partitions == PARTITIONS_UNKNOWN) {
-      return streamSpec;
-    } else {
-      return streamSpec.copyWithPartitionCount(partitions);
+  StreamSpec getStreamSpec() {
+    StreamSpec spec = (partitions == PARTITIONS_UNKNOWN) ?
+        streamSpec : streamSpec.copyWithPartitionCount(partitions);
+
+    if (isIntermediate) {
+      String physicalName = StreamManager.createUniqueNameForBatch(spec.getPhysicalName(), config);
+      if (!physicalName.equals(spec.getPhysicalName())) {
+        spec = spec.copyWithPhysicalName(physicalName);
+      }
     }
+    return spec;
   }
 
   SystemStream getSystemStream() {
@@ -103,4 +116,22 @@ public class StreamEdge {
   boolean isIntermediate() {
     return isIntermediate;
   }
+
+  Config generateConfig() {
+    Map<String, String> config = new HashMap<>();
+    StreamSpec spec = getStreamSpec();
+    config.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
+    config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
+    if (isIntermediate()) {
+      config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+      config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
+    }
+    if (spec.isBounded()) {
+      config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true");
+    }
+    spec.getConfig().forEach((property, value) -> {
+        config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
+      });
+    return new MapConfig(config);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index c6ab036..3028e5f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -25,13 +25,23 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.*;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
 
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 
 public class StreamManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
@@ -75,4 +85,70 @@ public class StreamManager {
 
     return streamToPartitionCount;
   }
+
+  /**
+   * This is a best-effort approach to clear the internal streams from previous run, including intermediate streams,
+   * checkpoint stream and changelog streams.
+   * For batch processing, we always clean up the previous internal streams and create a new set for each run.
+   * @param prevConfig config of the previous run
+   */
+  public void clearStreamsFromPreviousRun(Config prevConfig) {
+    try {
+      ApplicationConfig appConfig = new ApplicationConfig(prevConfig);
+      LOGGER.info("run.id from previous run is {}", appConfig.getRunId());
+
+      StreamConfig streamConfig = new StreamConfig(prevConfig);
+
+      //Find all intermediate streams and clean up
+      Set<StreamSpec> intStreams = JavaConversions.asJavaCollection(streamConfig.getStreamIds()).stream()
+          .filter(streamConfig::getIsIntermediate)
+          .map(id -> new StreamSpec(id, streamConfig.getPhysicalName(id), streamConfig.getSystem(id)))
+          .collect(Collectors.toSet());
+      intStreams.forEach(stream -> {
+          LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName());
+          sysAdmins.get(stream.getSystemName()).clearStream(stream);
+        });
+
+      //Find checkpoint stream and clean up
+      TaskConfig taskConfig = new TaskConfig(prevConfig);
+      String checkpointManagerFactoryClass = taskConfig.getCheckpointManagerFactory().getOrElse(defaultValue(null));
+      if (checkpointManagerFactoryClass != null) {
+        CheckpointManager checkpointManager = ((CheckpointManagerFactory) Util.getObj(checkpointManagerFactoryClass))
+            .getCheckpointManager(prevConfig, new MetricsRegistryMap());
+        checkpointManager.clearCheckpoints();
+      }
+
+      //Find changelog streams and remove them
+      StorageConfig storageConfig = new StorageConfig(prevConfig);
+      for (String store : JavaConversions.asJavaCollection(storageConfig.getStoreNames())) {
+        String changelog = storageConfig.getChangelogStream(store).getOrElse(defaultValue(null));
+        if (changelog != null) {
+          LOGGER.info("Clear store {} changelog {}", store, changelog);
+          SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
+          StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
+          sysAdmins.get(spec.getSystemName()).clearStream(spec);
+        }
+      }
+    } catch (Exception e) {
+      // For batch, we always create a new set of internal streams (checkpoint, changelog and intermediate) with unique
+      // id. So if clearStream doesn't work, it won't affect the correctness of the results.
+      // We log a warning here and rely on retention to clean up the streams later.
+      LOGGER.warn("Fail to clear internal streams from previous run. Please clean up manually.", e);
+    }
+  }
+
+  /**
+   * Create a unique stream name if it's batch mode and has a valid run.id.
+   * @param stream physical name of the stream
+   * @param config {@link Config} object
+   * @return stream name created
+   */
+  public static String createUniqueNameForBatch(String stream, Config config) {
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && appConfig.getRunId() != null) {
+      return stream + "-" + appConfig.getRunId();
+    } else {
+      return stream;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 3c7c83d..b8a8ca1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -18,12 +18,13 @@
  */
 package org.apache.samza.runtime;
 
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ApplicationConfig.ApplicationMode;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.execution.ExecutionPlan;
@@ -34,6 +35,13 @@ import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * Defines common, core behavior for implementations of the {@link ApplicationRunner} API
@@ -42,12 +50,10 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
   private final StreamManager streamManager;
-  private final ExecutionPlanner planner;
 
   public AbstractApplicationRunner(Config config) {
     super(config);
     this.streamManager = new StreamManager(new JavaSystemConfig(config).getSystemAdmins());
-    this.planner = new ExecutionPlanner(config, streamManager);
   }
 
   @Override
@@ -96,20 +102,40 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName, String system) {
     StreamConfig streamConfig = new StreamConfig(config);
     Map<String, String> properties = streamConfig.getStreamProperties(streamId);
+    boolean isBounded = streamConfig.getIsBounded(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, isBounded, properties);
+  }
 
-    return new StreamSpec(streamId, physicalName, system, properties);
+  /* package private */
+  ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
+    return getExecutionPlan(app, null);
   }
 
-  final ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
+  /* package private */
+  ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception {
     // build stream graph
     StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);
 
     // create the physical execution plan
+    Map<String, String> cfg = new HashMap<>(config);
+    if (StringUtils.isNoneEmpty(runId)) {
+      cfg.put(ApplicationConfig.APP_RUN_ID, runId);
+    }
+
+    Set<StreamSpec> inputStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
+    inputStreams.removeAll(streamGraph.getOutputStreams().keySet());
+    ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded)
+        ? ApplicationMode.BATCH : ApplicationMode.STREAM;
+    cfg.put(ApplicationConfig.APP_MODE, mode.name());
+
+    ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager);
     return planner.plan(streamGraph);
   }
 
-  final StreamManager getStreamManager() {
+  /* package private for testing */
+  StreamManager getStreamManager() {
     return streamManager;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 53cd2f6..3e046af 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -21,11 +21,15 @@ package org.apache.samza.runtime;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,11 +57,18 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
+      // TODO: this is a tmp solution and the run.id generation will be addressed in another JIRA
+      String runId = String.valueOf(System.currentTimeMillis());
+      LOG.info("The run id for this run is {}", runId);
+
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app);
+      ExecutionPlan plan = getExecutionPlan(app, runId);
       writePlanJsonFile(plan.getPlanAsJson());
 
       // 2. create the necessary streams
+      if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
+        getStreamManager().clearStreamsFromPreviousRun(getConfigFromPrevRun());
+      }
       getStreamManager().createStreams(plan.getIntermediateStreams());
 
       // 3. submit jobs for remote execution
@@ -133,4 +144,18 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       throw new SamzaException("Failed to get status for application", t);
     }
   }
+
+  private Config getConfigFromPrevRun() {
+    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
+    CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(
+        config, new MetricsRegistryMap());
+    consumer.register();
+    consumer.start();
+    consumer.bootstrap();
+    consumer.stop();
+
+    Config cfg = consumer.getConfig();
+    LOG.info("Previous config is: " + cfg.toString());
+    return cfg;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2b8349c..4c4a645 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -373,7 +373,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         throws Exception {
       LOG.info("Got new session created event for processor=" + processorId);
 
-
       LOG.info("register zk controller for the new session");
       zkController.register();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 20192fb..dd4b120 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -37,6 +37,7 @@ object StreamConfig {
   val BOOTSTRAP =               SAMZA_PROPERTY + "bootstrap"
   val PRIORITY =                SAMZA_PROPERTY + "priority"
   val IS_INTERMEDIATE =         SAMZA_PROPERTY + "intermediate"
+  val IS_BOUNDED =              SAMZA_PROPERTY + "bounded"
 
   // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
   private val STREAMS_PREFIX = "streams."
@@ -45,7 +46,9 @@ object StreamConfig {
   val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
   val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
   val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
-  val IS_INTERMEDIATE_FROM_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE
+  val IS_INTERMEDIATE_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE
+  val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED
+  val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
@@ -159,7 +162,11 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    * @return          true if the stream is intermediate
    */
   def getIsIntermediate(streamId: String) = {
-    getBoolean(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID format streamId, false)
+    getBoolean(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID format streamId, false)
+  }
+
+  def getIsBounded(streamId: String) = {
+    getBoolean(StreamConfig.IS_BOUNDED_FOR_STREAM_ID format streamId, false)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 42bedec..92c3663 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -78,7 +78,7 @@ object JobModelManager extends Logging {
    * and writes it to the coordinator stream.
    * d) Builds JobModelManager using the jobModel read from coordinator stream.
    * @param coordinatorSystemConfig A config object that contains job.name
-   *                                job.id, and all system.&lt;job-coordinator-system-name&gt;.*
+   *                                job.id, and all system.&lt;job-coordinator-system-name&gt;.*Ch
    *                                configuration. The method will use this config to read all configuration
    *                                from the coordinator stream, and instantiate a JobModelManager.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index c4fd8f7..bab7159 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -371,13 +371,13 @@ public class TestExecutionPlanner {
   @Test
   public void testMaxPartition() {
     Collection<StreamEdge> edges = new ArrayList<>();
-    StreamEdge edge = new StreamEdge(input1);
+    StreamEdge edge = new StreamEdge(input1, config);
     edge.setPartitionCount(2);
     edges.add(edge);
-    edge = new StreamEdge(input2);
+    edge = new StreamEdge(input2, config);
     edge.setPartitionCount(32);
     edges.add(edge);
-    edge = new StreamEdge(input3);
+    edge = new StreamEdge(input3, config);
     edge.setPartitionCount(16);
     edges.add(edge);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
new file mode 100644
index 0000000..0a225f5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStreamEdge {
+  StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", "system-1");
+
+  @Test
+  public void testGetStreamSpec() {
+    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    assertEquals(edge.getStreamSpec(), spec);
+    assertEquals(edge.getStreamSpec().getPartitionCount(), 1 /*StreamSpec.DEFAULT_PARTITION_COUNT*/);
+
+    edge.setPartitionCount(10);
+    assertEquals(edge.getStreamSpec().getPartitionCount(), 10);
+  }
+
+  @Test
+  public void testGetStreamSpec_Batch() {
+    Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+    config.put(ApplicationConfig.APP_RUN_ID, "123");
+    StreamEdge edge = new StreamEdge(spec, true, new MapConfig(config));
+    assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName() + "-123");
+  }
+
+  @Test
+  public void testGenerateConfig() {
+    // an example unbounded IO stream
+    StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", false, Collections.singletonMap("property1", "haha"));
+    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    Config config = edge.generateConfig();
+    StreamConfig streamConfig = new StreamConfig(config);
+    assertEquals(streamConfig.getSystem(spec.getId()), "system-1");
+    assertEquals(streamConfig.getPhysicalName(spec.getId()), "physical-stream-1");
+    assertEquals(streamConfig.getIsIntermediate(spec.getId()), false);
+    assertEquals(streamConfig.getIsBounded(spec.getId()), false);
+    assertEquals(streamConfig.getStreamProperties(spec.getId()).get("property1"), "haha");
+
+    // bounded stream
+    spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", true, Collections.singletonMap("property1", "haha"));
+    edge = new StreamEdge(spec, false, new MapConfig());
+    config = edge.generateConfig();
+    streamConfig = new StreamConfig(config);
+    assertEquals(streamConfig.getIsBounded(spec.getId()), true);
+
+    // intermediate stream
+    edge = new StreamEdge(spec, true, new MapConfig());
+    config = edge.generateConfig();
+    streamConfig = new StreamConfig(config);
+    assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
+    assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
new file mode 100644
index 0000000..dc36df8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class TestStreamManager {
+  private static final String SYSTEM1 = "system-1";
+  private static final String SYSTEM2 = "system-2";
+  private static final String STREAM1 = "stream-1";
+  private static final String STREAM2 = "stream-2";
+
+  @Test
+  public void testCreateStreams() {
+    StreamSpec spec1 = new StreamSpec(STREAM1, STREAM1, SYSTEM1);
+    StreamSpec spec2 = new StreamSpec(STREAM2, STREAM2, SYSTEM2);
+    List<StreamSpec> specList = new ArrayList<>();
+    specList.add(spec1);
+    specList.add(spec2);
+
+    SystemAdmin admin1 = mock(SystemAdmin.class);
+    SystemAdmin admin2 = mock(SystemAdmin.class);
+    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
+    sysAdmins.put(SYSTEM1, admin1);
+    sysAdmins.put(SYSTEM2, admin2);
+
+    StreamManager manager = new StreamManager(sysAdmins);
+    manager.createStreams(specList);
+
+    ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
+    verify(admin1).createStream(captor.capture());
+    assertEquals(STREAM1, captor.getValue().getPhysicalName());
+
+    captor = ArgumentCaptor.forClass(StreamSpec.class);
+    verify(admin2).createStream(captor.capture());
+    assertEquals(STREAM2, captor.getValue().getPhysicalName());
+  }
+
+  @Test
+  public void testGetStreamPartitionCounts() {
+    SystemAdmin admin1 = mock(SystemAdmin.class);
+    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
+    sysAdmins.put(SYSTEM1, admin1);
+
+    Map<String, SystemStreamMetadata> map = new HashMap<>();
+    SystemStreamMetadata meta1 = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitions = new HashMap<>();
+    partitions.put(new Partition(0), null);
+    when(meta1.getSystemStreamPartitionMetadata()).thenReturn(partitions);
+    map.put(STREAM1, meta1);
+
+    SystemStreamMetadata meta2 = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitions2 = new HashMap<>();
+    partitions2.put(new Partition(0), null);
+    partitions2.put(new Partition(1), null);
+    when(meta2.getSystemStreamPartitionMetadata()).thenReturn(partitions2);
+    map.put(STREAM2, meta2);
+
+    when(admin1.getSystemStreamMetadata(anyObject())).thenReturn(map);
+
+    Set<String> streams = new HashSet<>();
+    streams.add(STREAM1);
+    streams.add(STREAM2);
+    StreamManager manager = new StreamManager(sysAdmins);
+    Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, streams);
+
+    assertTrue(counts.get(STREAM1).equals(1));
+    assertTrue(counts.get(STREAM2).equals(2));
+  }
+
+  private static CheckpointManager checkpointManager = mock(CheckpointManager.class);
+  public static final class MockCheckpointManagerFactory implements CheckpointManagerFactory {
+    @Override
+    public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) {
+      return checkpointManager;
+    }
+  }
+
+  @Test
+  public void testClearStreamsFromPreviousRun() {
+    SystemAdmin admin1 = mock(SystemAdmin.class);
+    SystemAdmin admin2 = mock(SystemAdmin.class);
+    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
+    sysAdmins.put(SYSTEM1, admin1);
+    sysAdmins.put(SYSTEM2, admin2);
+
+    String runId = "123";
+    Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.APP_RUN_ID, "123");
+    config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+
+    config.put("streams.stream-1.samza.system", SYSTEM1);
+    config.put("streams.stream-1.samza.physical.name", STREAM1 + "-" + runId);
+    config.put("streams.stream-1.samza.intermediate", "true");
+
+    config.put("task.checkpoint.factory", MockCheckpointManagerFactory.class.getName());
+    config.put("stores.test-store.factory", "dummyfactory");
+    config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2);
+
+    StreamManager manager = new StreamManager(sysAdmins);
+    manager.clearStreamsFromPreviousRun(new MapConfig(config));
+
+    ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
+    verify(admin1).clearStream(captor.capture());
+    assertEquals(captor.getValue().getPhysicalName(), STREAM1 + "-" + runId);
+
+    captor = ArgumentCaptor.forClass(StreamSpec.class);
+    verify(admin2).clearStream(captor.capture());
+    assertEquals(captor.getValue().getPhysicalName(), STREAM2 + "-" + runId);
+
+    verify(checkpointManager, times(1)).clearCheckpoints();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 5b2c661..d2094b4 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -20,13 +20,9 @@
 package org.apache.samza.runtime;
 
 import com.google.common.collect.ImmutableList;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
-import java.util.Set;
+
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
@@ -36,7 +32,6 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
-import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
@@ -54,6 +49,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 
@@ -72,51 +68,29 @@ public class TestLocalApplicationRunner {
   public void testStreamCreation()
       throws Exception {
     Map<String, String> config = new HashMap<>();
-    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
 
-    ExecutionPlanner planner = mock(ExecutionPlanner.class);
-    Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner");
-    plannerField.setAccessible(true);
-    plannerField.set(runner, planner);
-
     StreamManager streamManager = mock(StreamManager.class);
-    Field streamManagerField = runner.getClass().getSuperclass().getDeclaredField("streamManager");
-    streamManagerField.setAccessible(true);
-    streamManagerField.set(runner, streamManager);
-    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    doReturn(streamManager).when(runner).getStreamManager();
 
-    ExecutionPlan plan = new ExecutionPlan() {
-      @Override
-      public List<JobConfig> getJobConfigs() {
-        return Collections.emptyList();
-      }
-
-      @Override
-      public List<StreamSpec> getIntermediateStreams() {
-        return Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"));
-      }
-
-      @Override
-      public String getPlanAsJson()
-          throws Exception {
-        return "";
-      }
-    };
-    when(planner.plan(anyObject())).thenReturn(plan);
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+    when(plan.getPlanAsJson()).thenReturn("");
+    doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
     mockStatic(CoordinationUtilsFactory.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
     when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
 
-    LocalApplicationRunner spy = spy(runner);
     try {
-      spy.run(app);
+      runner.run(app);
     } catch (Throwable t) {
       assertNotNull(t); //no jobs exception
     }
 
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
     verify(streamManager).createStreams(captor.capture());
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
@@ -127,41 +101,19 @@ public class TestLocalApplicationRunner {
   public void testStreamCreationWithCoordination()
       throws Exception {
     Map<String, String> config = new HashMap<>();
-    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(new MapConfig(config));
+    LocalApplicationRunner runner = spy(localRunner);
+
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
 
-    ExecutionPlanner planner = mock(ExecutionPlanner.class);
-    Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner");
-    plannerField.setAccessible(true);
-    plannerField.set(runner, planner);
-
     StreamManager streamManager = mock(StreamManager.class);
-    Field streamManagerField = runner.getClass().getSuperclass().getDeclaredField("streamManager");
-    streamManagerField.setAccessible(true);
-    streamManagerField.set(runner, streamManager);
-    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    doReturn(streamManager).when(runner).getStreamManager();
 
-    ExecutionPlan plan = new ExecutionPlan() {
-      @Override
-      public List<JobConfig> getJobConfigs() {
-        return Collections.emptyList();
-      }
-
-      @Override
-      public List<StreamSpec> getIntermediateStreams() {
-        return Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"));
-      }
-
-      @Override
-      public String getPlanAsJson()
-          throws Exception {
-        return "";
-      }
-    };
-    when(planner.plan(anyObject())).thenReturn(plan);
-
-    LocalApplicationRunner spy = spy(runner);
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+    when(plan.getPlanAsJson()).thenReturn("");
+    doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
@@ -175,12 +127,14 @@ public class TestLocalApplicationRunner {
         .thenReturn(coordinationUtils);
 
     try {
-      spy.run(app);
+      runner.run(app);
     } catch (Throwable t) {
       assertNotNull(t); //no jobs exception
     }
 
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
     verify(streamManager).createStreams(captor.capture());
+
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
     assertEquals(streamSpecs.get(0).getId(), "test-stream");
@@ -220,33 +174,15 @@ public class TestLocalApplicationRunner {
       throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
-    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
 
-    ExecutionPlanner planner = mock(ExecutionPlanner.class);
-    Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner");
-    plannerField.setAccessible(true);
-    plannerField.set(runner, planner);
-
-    ExecutionPlan plan = new ExecutionPlan() {
-      @Override
-      public List<JobConfig> getJobConfigs() {
-        return Collections.singletonList(new JobConfig(new MapConfig(config)));
-      }
-
-      @Override
-      public List<StreamSpec> getIntermediateStreams() {
-        return Collections.emptyList();
-      }
-
-      @Override
-      public String getPlanAsJson()
-          throws Exception {
-        return "";
-      }
-    };
-    when(planner.plan(anyObject())).thenReturn(plan);
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
+    when(plan.getPlanAsJson()).thenReturn("");
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
+    doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -260,12 +196,11 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-    LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
-    spy.run(app);
+    runner.run(app);
 
-    assertEquals(spy.status(app), ApplicationStatus.SuccessfulFinish);
+    assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
   }
 
   @Test
@@ -273,33 +208,15 @@ public class TestLocalApplicationRunner {
       throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.PROCESSOR_ID, "0");
-    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
 
-    ExecutionPlanner planner = mock(ExecutionPlanner.class);
-    Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner");
-    plannerField.setAccessible(true);
-    plannerField.set(runner, planner);
-
-    ExecutionPlan plan = new ExecutionPlan() {
-      @Override
-      public List<JobConfig> getJobConfigs() {
-        return Collections.singletonList(new JobConfig(new MapConfig(config)));
-      }
-
-      @Override
-      public List<StreamSpec> getIntermediateStreams() {
-        return Collections.emptyList();
-      }
-
-      @Override
-      public String getPlanAsJson()
-          throws Exception {
-        return "";
-      }
-    };
-    when(planner.plan(anyObject())).thenReturn(plan);
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
+    when(plan.getPlanAsJson()).thenReturn("");
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
+    doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
     Throwable t = new Throwable("test failure");
     StreamProcessor sp = mock(StreamProcessor.class);
@@ -313,16 +230,15 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-    LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
     try {
-      spy.run(app);
+      runner.run(app);
     } catch (Throwable th) {
       assertNotNull(th);
     }
 
-    assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
+    assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
   }
 
   public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) {

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index c7e82f7..fd53a45 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -149,7 +149,7 @@ public class KafkaStreamSpec extends StreamSpec {
    */
   public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount, int replicationFactor,
       Properties properties) {
-    super(id, topicName, systemName, partitionCount, propertiesToMap(properties));
+    super(id, topicName, systemName, partitionCount, false, propertiesToMap(properties));
 
     if (replicationFactor <= 0) {
       throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 6461f9d..c8b7a9b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -33,8 +33,10 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.config.JavaSystemConfig
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.{StreamSpec, SystemAdmin}
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.util._
 
@@ -62,7 +64,8 @@ class KafkaCheckpointManager(
                               failOnCheckpointValidation: Boolean,
                               val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                               serde: CheckpointSerde = new CheckpointSerde,
-                              checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
+                              checkpointTopicProperties: Properties = new Properties,
+                              systemAdmin: SystemAdmin = null) extends CheckpointManager with Logging {
   import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._
 
   var taskNames = Set[TaskName]()
@@ -72,6 +75,8 @@ class KafkaCheckpointManager(
   var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
   val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
 
+
+
   KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
 
   info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
@@ -275,22 +280,28 @@ class KafkaCheckpointManager(
 
   }
 
-  def start {
+  override def start {
     kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties)
     kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation)
   }
 
-  def register(taskName: TaskName) {
+  override def register(taskName: TaskName) {
     debug("Adding taskName " + taskName + " to " + this)
     taskNames += taskName
   }
 
-  def stop = {
+  override def stop = {
     if (producer != null) {
       producer.close
     }
   }
 
+  override def clearCheckpoints = {
+    info("Clear checkpoint stream %s in system %s" format (checkpointTopic, systemName))
+    val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, systemName)
+    systemAdmin.clearStream(spec)
+  }
+
   override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index c42882e..0df581f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -27,9 +27,10 @@ import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.{Config, KafkaConfig}
+import org.apache.samza.config.{SystemConfig, JavaSystemConfig, Config, KafkaConfig}
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+import org.apache.samza.system.{SystemFactory, SystemAdmin}
+import org.apache.samza.util.{Util, ClientUtilTopicMetadataStore, KafkaUtil, Logging}
 
 object KafkaCheckpointManagerFactory {
   val INJECTED_PRODUCER_PROPERTIES = Map(
@@ -82,10 +83,14 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     }
     val socketTimeout = consumerConfig.socketTimeoutMs
 
+    val systemConfig = new SystemConfig(config)
+    val systemFactoryClassName = systemConfig.getSystemFactory(systemName).get
+    val systemFactory: SystemFactory = Util.getObj(systemFactoryClassName)
+    val systemAdmin = systemFactory.getAdmin(systemName, config)
 
     new KafkaCheckpointManager(
       clientId,
-      KafkaUtil.getCheckpointTopic(jobName, jobId),
+      KafkaUtil.getCheckpointTopic(jobName, jobId, config),
       systemName,
       config.getCheckpointReplicationFactor.getOrElse("3").toInt,
       socketTimeout,
@@ -96,6 +101,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       connectZk,
       config.getSystemStreamPartitionGrouperFactory,      // To find out the SSPGrouperFactory class so it can be included/verified in the key
       config.failOnCheckpointValidation,
-      checkpointTopicProperties = getCheckpointTopicProperties(config))
+      checkpointTopicProperties = getCheckpointTopicProperties(config),
+      systemAdmin = systemAdmin)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 6e582e9..a2256c8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -402,6 +402,7 @@ class KafkaSystemAdmin(
    * @inheritdoc
    */
   override def createStream(spec: StreamSpec): Boolean = {
+    info("Create topic %s in system %s" format (spec.getPhysicalName, systemName))
     val kSpec = toKafkaSpec(spec)
     var streamCreated = false
 
@@ -501,6 +502,7 @@ class KafkaSystemAdmin(
    * Otherwise it's a no-op.
    */
   override def clearStream(spec: StreamSpec): Boolean = {
+    info("Delete topic %s in system %s" format (spec.getPhysicalName, systemName))
     val kSpec = KafkaStreamSpec.fromSpec(spec)
     var retries = CLEAR_STREAM_RETRIES
     new ExponentialSleepStrategy().run(

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 41d380b..39edba7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -24,9 +24,10 @@ import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
 import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.config.Config
-import org.apache.samza.config.ConfigException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
+import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.execution.StreamManager
 import org.apache.samza.system.OutgoingMessageEnvelope
 import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
 import org.apache.kafka.common.errors.TopicExistsException
@@ -57,8 +58,11 @@ object KafkaUtil extends Logging {
     abs(envelope.getPartitionKey.hashCode()) % numPartitions
   }
 
-  def getCheckpointTopic(jobName: String, jobId: String) =
-    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+  def getCheckpointTopic(jobName: String, jobId: String, config: Config) = {
+    val checkpointTopic = "__samza_checkpoint_ver_%d_for_%s_%s" format(CHECKPOINT_LOG_VERSION_NUMBER,
+      jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+    StreamManager.createUniqueNameForBatch(checkpointTopic, config)
+  }
 
   /**
    * Exactly the same as Kafka's ErrorMapping.maybeThrowException

http://git-wip-us.apache.org/repos/asf/samza/blob/47541488/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
index 69345a3..5612704 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -33,7 +33,7 @@ public class TestKafkaStreamSpec {
 
   @Test
   public void testUnsupportedConfigStrippedFromProperties() {
-    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", false, ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
 
     // First verify the original
     assertEquals("7", original.get("replication.factor"));