You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/18 22:31:10 UTC

samza git commit: SAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit

Repository: samza
Updated Branches:
  refs/heads/master 5d73ecdad -> 3cc2a05f7


SAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit

Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <do...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Jacob Maes <jm...@apache.org>, Yi Pan <ni...@gmail.com>

Closes #347 from lindong28/SAMZA-1478


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3cc2a05f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3cc2a05f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3cc2a05f

Branch: refs/heads/master
Commit: 3cc2a05f7009c34759baf1533f0a0e58f75839de
Parents: 5d73ecd
Author: Dong Lin <li...@gmail.com>
Authored: Wed Apr 18 15:31:06 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Apr 18 15:31:06 2018 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  20 ++++
 .../org/apache/samza/system/SystemAdmin.java    |  12 +-
 .../org/apache/samza/execution/StreamEdge.java  |   1 +
 .../apache/samza/execution/StreamManager.java   |   2 +-
 .../runtime/AbstractApplicationRunner.java      |   4 +-
 .../org/apache/samza/task/AsyncRunLoop.java     |   2 +-
 .../org/apache/samza/config/StreamConfig.scala  |  13 ++-
 .../org/apache/samza/config/SystemConfig.scala  |   7 ++
 .../apache/samza/container/SamzaContainer.scala |   5 +-
 .../apache/samza/container/TaskInstance.scala   |  14 ++-
 .../apache/samza/execution/TestStreamEdge.java  |   4 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  52 ++++++++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  10 +-
 .../system/kafka/TestKafkaSystemAdminJava.java  |  36 +++---
 .../system/kafka/TestKafkaSystemAdmin.scala     |  32 +++--
 .../samza/test/operator/BroadcastAssertApp.java |   9 +-
 .../test/operator/RepartitionJoinWindowApp.java |  26 +++--
 ...StreamApplicationIntegrationTestHarness.java |  18 ++-
 .../operator/TestRepartitionJoinWindowApp.java  | 117 +++++++++++++++----
 .../AbstractIntegrationTestHarness.scala        |   8 +-
 20 files changed, 290 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4495cb1..86ac427 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -951,6 +951,16 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.<br>samza.delete.committed.messages</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If set to true, automatically delete committed messages from streams whose committed messages can be deleted.
+                        A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+                        set <a href="#streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</a> to true in the configuration.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
                     <td class="default" rowspan="2"></td>
                     <td class="description">
@@ -1105,6 +1115,16 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If set to true, committed messages of this stream can be deleted. Committed messages of this stream will be deleted
+                        if <a href="#systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.samza.delete.committed.messages</a> is also
+                        set to true.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
                     <td class="default">false</td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index dce7030..13566a6 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -95,7 +95,7 @@ public interface SystemAdmin {
   }
 
   /**
-   * Clear the stream described by the spec.
+   * Clear the entire stream described by the spec.
    * @param streamSpec  The spec for the physical stream on the system.
    * @return {@code true} if the stream was successfully cleared.
    *         {@code false} if clearing stream failed.
@@ -103,4 +103,14 @@ public interface SystemAdmin {
   default boolean clearStream(StreamSpec streamSpec) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
+   *
+   * @param offsets a map from system stream partition to offset
+   */
+  default void deleteMessages(Map<SystemStreamPartition, String> offsets) {
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index bc08e70..b4c93d9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -124,6 +124,7 @@ public class StreamEdge {
     config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
     if (isIntermediate()) {
       config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+      config.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
       config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
       config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 3c1818f..b0473c1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -102,7 +102,7 @@ public class StreamManager {
 
       //Find all intermediate streams and clean up
       Set<StreamSpec> intStreams = JavaConversions.asJavaCollection(streamConfig.getStreamIds()).stream()
-          .filter(streamConfig::getIsIntermediate)
+          .filter(streamConfig::getIsIntermediateStream)
           .map(id -> new StreamSpec(id, streamConfig.getPhysicalName(id), streamConfig.getSystem(id)))
           .collect(Collectors.toSet());
       intStreams.forEach(stream -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 609b0ec..68962ce 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -119,8 +119,7 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return new StreamSpec(streamId, physicalName, system, isBounded, properties);
   }
 
-  /* package private */
-  ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
+  public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
     return getExecutionPlan(app, null);
   }
 
@@ -171,4 +170,5 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
       log.warn("Failed to write execution plan json to file", e);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index f3c4655..3b3e008 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -347,7 +347,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       this.task = task;
       this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
       Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
-      this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, task.hasIntermediateStreams());
+      this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, task.intermediateStreams().nonEmpty());
     }
 
     private void init() {

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index db86969..0a4623e 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -37,6 +37,7 @@ object StreamConfig {
   val BOOTSTRAP =               SAMZA_PROPERTY + "bootstrap"
   val PRIORITY =                SAMZA_PROPERTY + "priority"
   val IS_INTERMEDIATE =         SAMZA_PROPERTY + "intermediate"
+  val DELETE_COMMITTED_MESSAGES = SAMZA_PROPERTY + "delete.committed.messages"
   val IS_BOUNDED =              SAMZA_PROPERTY + "bounded"
 
   // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
@@ -47,6 +48,7 @@ object StreamConfig {
   val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
   val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
   val IS_INTERMEDIATE_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE
+  val DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID = STREAM_ID_PREFIX + DELETE_COMMITTED_MESSAGES
   val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED
   val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY
   val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
@@ -166,10 +168,19 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    * @param streamId  the identifier for the stream in the config.
    * @return          true if the stream is intermediate
    */
-  def getIsIntermediate(streamId: String) = {
+  def getIsIntermediateStream(streamId: String) = {
     getBoolean(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID format streamId, false)
   }
 
+  /**
+    * Gets the boolean flag of whether the committed messages specified streamId can be deleted
+    * @param streamId  the identifier for the stream in the config.
+    * @return          true if the committed messages of the stream can be deleted
+    */
+  def getDeleteCommittedMessages(streamId: String) = {
+    getBoolean(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID format streamId, false)
+  }
+
   def getIsBounded(streamId: String) = {
     getBoolean(StreamConfig.IS_BOUNDED_FOR_STREAM_ID format streamId, false)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 91fb261..bebdbd8 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -31,6 +31,11 @@ object SystemConfig {
   val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
   val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
 
+  // If true, automatically delete committed messages from streams whose committed messages can be deleted.
+  // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+  // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
+  val DELETE_COMMITTED_MESSAGES = SYSTEM_PREFIX + "samza.delete.committed.messages"
+
   implicit def Config2System(config: Config) = new SystemConfig(config)
 }
 
@@ -45,6 +50,8 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
 
+  def deleteCommittedMessages(systemName: String) = getOption(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName))
+
   /**
    * Returns a list of all system names from the config file. Useful for
    * getting individual systems.

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1641511..7aec8e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -288,7 +288,7 @@ object SamzaContainer extends Logging {
 
     val intermediateStreams = config
       .getStreamIds
-      .filter(config.getIsIntermediate(_))
+      .filter(config.getIsIntermediateStream(_))
       .toList
 
     info("Got intermediate streams: %s" format intermediateStreams)
@@ -623,8 +623,6 @@ object SamzaContainer extends Logging {
     } else {
       info(s"Disk quotas disabled because polling interval is not set ($DISK_POLL_INTERVAL_KEY)")
     }
-
-
     info("Samza container setup complete.")
 
     new SamzaContainer(
@@ -1000,7 +998,6 @@ class SamzaContainer(
   def shutdownTask {
     info("Shutting down task instance stream tasks.")
 
-
     if (taskThreadPool != null) {
       info("Shutting down task thread pool")
       try {

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 3ac37c6..61e8c77 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -37,6 +37,7 @@ import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
+import scala.collection.Map
 
 class TaskInstance(
   val task: Any,
@@ -72,7 +73,9 @@ class TaskInstance(
     scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
   systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
 
-  val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_))
+  val intermediateStreams: Set[String] = config.getStreamIds.filter(config.getIsIntermediateStream).toSet
+
+  val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
 
   def registerMetrics {
     debug("Registering metrics for taskName: %s" format taskName)
@@ -218,6 +221,15 @@ class TaskInstance(
     trace("Checkpointing offsets for taskName: %s" format taskName)
 
     offsetManager.writeCheckpoint(taskName, checkpoint)
+
+    if (checkpoint != null) {
+      checkpoint.getOffsets.asScala
+        .filter { case (ssp, _) => streamsToDeleteCommittedMessages.contains(ssp.getStream) } // Only delete data of intermediate streams
+        .groupBy { case (ssp, _) => ssp.getSystem }
+        .foreach { case (systemName: String, offsets: Map[SystemStreamPartition, String]) =>
+          systemAdmins.getSystemAdmin(systemName).deleteMessages(offsets.asJava)
+        }
+    }
   }
 
   def shutdownTask {

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 424f102..8ad5b7e 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -63,7 +63,7 @@ public class TestStreamEdge {
     StreamConfig streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getSystem(spec.getId()), "system-1");
     assertEquals(streamConfig.getPhysicalName(spec.getId()), "physical-stream-1");
-    assertEquals(streamConfig.getIsIntermediate(spec.getId()), false);
+    assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), false);
     assertEquals(streamConfig.getIsBounded(spec.getId()), false);
     assertEquals(streamConfig.getStreamProperties(spec.getId()).get("property1"), "haha");
 
@@ -78,7 +78,7 @@ public class TestStreamEdge {
     edge = new StreamEdge(spec, true, new MapConfig());
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
-    assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
+    assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);
     assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest");
     assertEquals(streamConfig.getPriority(spec.toSystemStream()), Integer.MAX_VALUE);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 1f4672d..c76f6e5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -22,12 +22,15 @@ package org.apache.samza.system.kafka
 import java.util
 import java.util.{Properties, UUID}
 
-import kafka.admin.AdminUtils
+import com.google.common.annotations.VisibleForTesting
+import kafka.admin.{AdminClient, AdminUtils}
 import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.TopicPartition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
@@ -38,6 +41,7 @@ import scala.collection.JavaConverters._
 
 object KafkaSystemAdmin extends Logging {
 
+  @VisibleForTesting @volatile var deleteMessagesCalled = false
   val CLEAR_STREAM_RETRIES = 3
 
   /**
@@ -144,20 +148,38 @@ class KafkaSystemAdmin(
   /**
    * Kafka properties to be used during the intermediate topic creation
    */
-  intermediateStreamProperties: Map[String, Properties] = Map()) extends ExtendedSystemAdmin with Logging {
+  intermediateStreamProperties: Map[String, Properties] = Map(),
+
+  /**
+   * Whether deleteMessages() API can be used
+   */
+  deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
   @volatile var running = false
+  @volatile var adminClient: AdminClient = null
 
   override def start() = {
-    running = true
+    if (!running) {
+      running = true
+      adminClient = createAdminClient()
+    }
   }
 
   override def stop() = {
-    running = false
+    if (running) {
+      running = false
+      adminClient.close()
+      adminClient = null
+    }
   }
 
+  private def createAdminClient(): AdminClient = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListString)
+    AdminClient.create(props)
+  }
 
   override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
     getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
@@ -555,13 +577,33 @@ class KafkaSystemAdmin(
   }
 
   /**
+    * @inheritdoc
+    *
+    * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
+    * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
+    */
+  override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
+    deleteMessagesCalled = true
+
+    if (!running) {
+      throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
+    }
+    if (deleteCommittedMessages) {
+      val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) =>
+        (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
+      }.toMap
+      adminClient.deleteRecordsBefore(nextOffsets)
+    }
+  }
+
+  /**
    * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
    * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
    *
    * Currently it's used in the context of the broadcast streams to detect
    * the mismatch between two streams when consuming the broadcast streams.
    */
-  override def offsetComparator(offset1: String, offset2: String) = {
+  override def offsetComparator(offset1: String, offset2: String): Integer = {
     offset1.toLong compare offset2.toLong
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a480042..9f0b5f2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -33,6 +33,7 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemAdmin
+import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.system.SystemConsumer
 
 object KafkaSystemFactory extends Logging {
@@ -124,9 +125,9 @@ class KafkaSystemFactory extends SystemFactory with Logging {
        val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
        info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
        (topicName, changelogInfo)
-    }}.toMap
-
+    }}
 
+    val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean)
     val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
     new KafkaSystemAdmin(
       systemName,
@@ -138,7 +139,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       bufferSize,
       clientId,
       topicMetaInformation,
-      intermediateStreamProperties)
+      intermediateStreamProperties,
+      deleteCommittedMessages)
   }
 
   def getCoordinatorTopicProperties(config: Config) = {
@@ -152,7 +154,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val appConfig = new ApplicationConfig(config)
     if (appConfig.getAppMode == ApplicationMode.BATCH) {
       val streamConfig = new StreamConfig(config)
-      streamConfig.getStreamIds().filter(streamConfig.getIsIntermediate(_)).map(streamId => {
+      streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
         val properties = new Properties()
         properties.putAll(streamConfig.getStreamProperties(streamId))
         properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 125cf61..77f47f9 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -28,6 +28,8 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -36,18 +38,13 @@ import static org.junit.Assert.*;
 
 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
-  KafkaSystemAdmin basicSystemAdmin = createSystemAdmin();
-
-
   @Test
   public void testCreateCoordinatorStream() {
-    KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
-    SystemAdmin admin = Mockito.spy(systemAdmin);
+    SystemAdmin admin = Mockito.spy(systemAdmin());
     StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
 
     admin.createStream(spec);
     admin.validateStream(spec);
-
     Mockito.verify(admin).createStream(Mockito.any());
   }
 
@@ -144,56 +141,51 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
   @Test
   public void testCreateStream() {
-    SystemAdmin admin = this.basicSystemAdmin;
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
-    admin.validateStream(spec);
+    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    systemAdmin().validateStream(spec);
 
-    assertFalse("createStream should return false if the stream already exists.", admin.createStream(spec));
+    assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
   }
 
   @Test(expected = StreamValidationException.class)
   public void testValidateStreamDoesNotExist() {
-    SystemAdmin admin = this.basicSystemAdmin;
 
     StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
 
-    admin.validateStream(spec);
+    systemAdmin().validateStream(spec);
   }
 
   @Test(expected = StreamValidationException.class)
   public void testValidateStreamWrongPartitionCount() {
-    SystemAdmin admin = this.basicSystemAdmin;
     StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
 
-    admin.validateStream(spec2);
+    systemAdmin().validateStream(spec2);
   }
 
   @Test(expected = StreamValidationException.class)
   public void testValidateStreamWrongName() {
-    SystemAdmin admin = this.basicSystemAdmin;
     StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
     StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
 
-    admin.validateStream(spec2);
+    systemAdmin().validateStream(spec2);
   }
 
   @Test
   public void testClearStream() {
-    KafkaSystemAdmin admin = this.basicSystemAdmin;
     StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
-    assertTrue(admin.clearStream(spec));
+    assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+    assertTrue(systemAdmin().clearStream(spec));
 
     scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
-    scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic);
+    scala.collection.immutable.Map<String, TopicMetadata> metadata = systemAdmin().getTopicMetadata(topic);
     assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 2039447..a533acc 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -26,7 +26,7 @@ import java.util.{Properties, UUID}
 import kafka.admin.AdminUtils
 import org.apache.kafka.common.errors.LeaderNotAvailableException
 import org.apache.kafka.common.protocol.Errors
-import kafka.consumer.{Consumer, ConsumerConfig}
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{TestUtils, ZkUtils}
@@ -59,16 +59,16 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
   var metadataStore: TopicMetadataStore = null
   var producerConfig: KafkaProducerConfig = null
-  var brokers: String = null
+  var systemAdmin: KafkaSystemAdmin = null
 
-  def generateConfigs() = {
+  override def generateConfigs(): Seq[KafkaConfig] = {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
     props.map(KafkaConfig.fromProps)
   }
 
   @BeforeClass
-  override def setUp {
-    super.setUp
+  override def setUp() {
+    super.setUp()
     val config = new java.util.HashMap[String, String]()
     config.put("bootstrap.servers", brokerList)
     config.put("acks", "all")
@@ -76,15 +76,17 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
     producerConfig = new KafkaProducerConfig("kafka", "i001", config)
     producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
+    systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+    systemAdmin.start()
   }
 
-
   @AfterClass
-  override def tearDown {
-    super.tearDown
+  override def tearDown() {
+    systemAdmin.stop()
+    producer.close()
+    super.tearDown()
   }
 
-
   def createTopic(topicName: String, partitionCount: Int) {
     AdminUtils.createTopic(
       zkUtils,
@@ -119,7 +121,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
     }
   }
 
-  def getConsumerConnector = {
+  def getConsumerConnector(): ConsumerConnector = {
     val props = new Properties
 
     props.put("zookeeper.connect", zkConnect)
@@ -130,12 +132,9 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
     Consumer.create(consumerConfig)
   }
 
-  def createSystemAdmin: KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
-  }
-
   def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
+    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
+      coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
   }
 
 }
@@ -147,9 +146,6 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
 class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
-  // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
-  val systemAdmin = createSystemAdmin
-
   @Test
   def testShouldAssembleMetadata {
     val oldestOffsets = Map(

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
index 426a53b..88e2765 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
@@ -29,15 +29,18 @@ import org.apache.samza.test.framework.StreamAssert;
 
 import java.util.Arrays;
 
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
-
 public class BroadcastAssertApp implements StreamApplication {
 
+  public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
+
+
   @Override
   public void init(StreamGraph graph, Config config) {
+    String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
+
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
     final MessageStream<PageView> broadcastPageViews = graph
-        .getInputStream(PAGE_VIEWS, serde)
+        .getInputStream(inputTopic, serde)
         .broadcast(serde, "pv");
 
     /**

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 346e958..120f902 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -23,13 +23,15 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
 import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.test.operator.data.UserPageAdClick;
@@ -40,16 +42,19 @@ import java.time.Duration;
  * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
  */
 public class RepartitionJoinWindowApp implements StreamApplication {
-  static final String PAGE_VIEWS = "page-views";
-  static final String AD_CLICKS = "ad-clicks";
-  static final String OUTPUT_TOPIC = "user-ad-click-counts";
+
+  public static final String INPUT_TOPIC_NAME_1_PROP = "inputTopicName1";
+  public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
+  public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new JsonSerdeV2<>(PageView.class));
-    MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new JsonSerdeV2<>(AdClick.class));
-    OutputStream<KV<String, String>> outputStream =
-        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
+    String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP);
+    String inputTopicName2 = config.get(INPUT_TOPIC_NAME_2_PROP);
+    String outputTopic = config.get(OUTPUT_TOPIC_NAME_PROP);
+
+    MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class));
+    MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class));
 
     MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
         .partitionBy(PageView::getViewId, pv -> pv,
@@ -73,7 +78,10 @@ public class RepartitionJoinWindowApp implements StreamApplication {
         .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3),
             new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
-        .sendTo(outputStream);
+        .sink((message, messageCollector, taskCoordinator) -> {
+            taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
+            messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
+          });
   }
 
   private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index fae3db4..1595347 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -27,10 +27,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.execution.TestStreamManager;
+import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.kafka.KafkaSystemAdmin;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.framework.StreamAssert;
 import scala.Option;
@@ -98,8 +100,9 @@ import java.util.Properties;
 public class StreamApplicationIntegrationTestHarness extends AbstractIntegrationTestHarness {
   private KafkaProducer producer;
   private KafkaConsumer consumer;
+  protected KafkaSystemAdmin systemAdmin;
   private StreamApplication app;
-  private ApplicationRunner runner;
+  protected AbstractApplicationRunner runner;
 
   private int numEmptyPolls = 3;
   private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
@@ -144,6 +147,9 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
         Option$.MODULE$.<File>empty(),
         Option$.MODULE$.<Properties>empty(),
         Option$.MODULE$.<Properties>apply(consumerDeserializerProperties));
+
+    systemAdmin = createSystemAdmin("kafka");
+    systemAdmin.start();
   }
 
   /**
@@ -213,7 +219,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
    * @param appName the name of the application
    * @param overriddenConfigs configs to override
    */
-  public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) {
+  public void runApplication(StreamApplication streamApplication, String appName, Map<String, String> overriddenConfigs) {
     Map<String, String> configs = new HashMap<>();
     configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
     configs.put("job.name", appName);
@@ -230,6 +236,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
     configs.put("job.default.system", "kafka");
     configs.put("job.coordinator.replication.factor", "1");
     configs.put("task.window.ms", "1000");
+    configs.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName());
 
     // This is to prevent tests from taking a long time to stop after they're done. The issue is that
     // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
@@ -247,7 +254,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
     }
 
     app = streamApplication;
-    runner = ApplicationRunner.fromConfig(new MapConfig(configs));
+    runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(new MapConfig(configs));
     runner.run(streamApplication);
 
     StreamAssert.waitForComplete();
@@ -262,6 +269,9 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
    */
   @Override
   public void tearDown() {
+    systemAdmin.stop();
+    producer.close();
+    consumer.close();
     super.tearDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index a83f9cf..77cd19a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -18,56 +18,99 @@
  */
 package org.apache.samza.test.operator;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.kafka.KafkaSystemAdmin;
+import org.apache.samza.util.ExponentialSleepStrategy;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
-
 
 /**
  * Test driver for {@link RepartitionJoinWindowApp}.
  */
 public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTestHarness {
 
-  @Before
-  public void setup() {
+  void initializeTopics(String input1, String input2, String output) {
     // create topics
-    createTopic(PAGE_VIEWS, 2);
-    createTopic(AD_CLICKS, 2);
-    createTopic(OUTPUT_TOPIC, 1);
+    createTopic(input1, 2);
+    createTopic(input2, 2);
+    createTopic(output, 1);
 
     // create events for the following user activity.
     // userId: (viewId, pageId, (adIds))
     // u1: (v1, p1, (a1)), (v2, p2, (a3))
     // u2: (v3, p1, (a1)), (v4, p3, (a5))
-    produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
-    produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+    produceMessage(input1, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(input1, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(input1, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(input1, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+    produceMessage(input2, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
+    produceMessage(input2, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
+    produceMessage(input2, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
+    produceMessage(input2, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+  }
+
+  @Test
+  public void testRepartitionJoinWindowAppWithoutDeletionOnCommit() throws Exception {
+    String inputTopicName1 = "page-views";
+    String inputTopicName2 = "ad-clicks";
+    String outputTopicName = "user-ad-click-counts";
+
+    KafkaSystemAdmin.deleteMessagesCalled_$eq(false);
 
-    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
-    produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
-    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
-    produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+    initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
 
+    // run the application
+    RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
+    String appName = "UserPageAdClickCounter";
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.kafka.samza.delete.committed.messages", "false");
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2);
+    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName);
+
+    runApplication(app, appName, configs);
+
+    // consume and validate result
+    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2);
+    Assert.assertEquals(2, messages.size());
+
+    Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled());
   }
 
   @Test
-  public void testRepartitionJoinWindowApp() throws Exception {
+  public void testRepartitionJoinWindowAppAndDeleteMessagesOnCommit() throws Exception {
+    String inputTopicName1 = "page-views2";
+    String inputTopicName2 = "ad-clicks2";
+    String outputTopicName = "user-ad-click-counts2";
+
+    initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
+
     // run the application
     RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
-    final String appName = "UserPageAdClickCounter";
-    runApplication(app, appName, null);
+    final String appName = "UserPageAdClickCounter2";
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.kafka.samza.delete.committed.messages", "true");
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1);
+    configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2);
+    configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName);
+
+    runApplication(app, appName, configs);
 
     // consume and validate result
-    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2);
     Assert.assertEquals(2, messages.size());
 
     for (ConsumerRecord<String, String> message : messages) {
@@ -76,10 +119,38 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
       Assert.assertTrue(key.equals("u1") || key.equals("u2"));
       Assert.assertEquals("2", value);
     }
+
+    // Verify that messages in the intermediate stream will be deleted in 10 seconds
+    long startTimeMs = System.currentTimeMillis();
+    for (StreamSpec spec: runner.getExecutionPlan(app).getIntermediateStreams()) {
+      long remainingMessageNum = -1;
+
+      while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) {
+        remainingMessageNum = 0;
+        SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata(
+            new HashSet<>(Arrays.asList(spec.getPhysicalName())), new ExponentialSleepStrategy.Mock(3)
+        ).get(spec.getPhysicalName()).get();
+
+        for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata().entrySet()) {
+          SystemStreamPartitionMetadata metadata = entry.getValue();
+          remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) - Long.parseLong(metadata.getOldestOffset());
+        }
+      }
+      Assert.assertEquals(0, remainingMessageNum);
+    }
+
+
   }
 
   @Test
   public void testBroadcastApp() {
-    runApplication(new BroadcastAssertApp(), "BroadcastTest", null);
+    String inputTopicName1 = "page-views";
+    String inputTopicName2 = "ad-clicks";
+    String outputTopicName = "user-ad-click-counts";
+    Map<String, String> configs = new HashMap<>();
+    configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
+
+    initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
+    runApplication(new BroadcastAssertApp(), "BroadcastTest", configs);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
index b01f944..bc00305 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
@@ -20,7 +20,9 @@ package org.apache.samza.test.harness
 import java.util.Properties
 
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.samza.system.kafka.KafkaSystemAdmin
 
 /**
  * LinkedIn integration test harness for Kafka
@@ -57,4 +59,8 @@ abstract class AbstractIntegrationTestHarness extends AbstractKafkaServerTestHar
    */
   def bootstrapServers(): String = super.bootstrapUrl
 
+  def createSystemAdmin(system: String): KafkaSystemAdmin = {
+    new KafkaSystemAdmin(system, bootstrapServers, connectZk = () => ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled))
+  }
+
 }
\ No newline at end of file