You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/08 18:00:05 UTC

[2/2] kafka git commit: KAFKA-3505: Fix punctuate generated record metadata

KAFKA-3505: Fix punctuate generated record metadata

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Anna Povzner <an...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1190 from guozhangwang/K3505


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a58407e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a58407e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a58407e

Branch: refs/heads/trunk
Commit: 3a58407e2e0aee0bb2c5e343fbe98c7f28fc6f3e
Parents: 8b9b07e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Apr 8 08:59:50 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 8 08:59:50 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/ConsumerRecordTest.java    |  2 +-
 .../streams/processor/ProcessorContext.java     | 17 +++-
 .../processor/internals/PartitionGroup.java     | 17 ++--
 .../internals/ProcessorContextImpl.java         | 11 ++-
 .../processor/internals/ProcessorNode.java      |  2 +-
 .../internals/PunctuationSchedule.java          |  2 +-
 .../processor/internals/StandbyContextImpl.java | 18 ++---
 .../streams/processor/internals/StreamTask.java | 13 +++-
 .../processor/internals/StreamThread.java       |  7 +-
 .../internals/KStreamKStreamJoinTest.java       | 72 ++++++++---------
 .../internals/KStreamKStreamLeftJoinTest.java   | 36 ++++-----
 .../internals/KStreamKTableLeftJoinTest.java    | 14 ++--
 .../internals/KStreamWindowAggregateTest.java   | 24 +++---
 .../kstream/internals/KTableFilterTest.java     | 36 ++++-----
 .../kstream/internals/KTableKTableJoinTest.java | 42 +++++-----
 .../internals/KTableKTableLeftJoinTest.java     | 42 +++++-----
 .../internals/KTableKTableOuterJoinTest.java    | 48 ++++++------
 .../kstream/internals/KTableMapValuesTest.java  | 16 ++--
 .../kstream/internals/KTableSourceTest.java     | 16 ++--
 .../processor/internals/PartitionGroupTest.java | 82 ++++++++++++++++++--
 .../processor/internals/StreamTaskTest.java     | 79 ++++++++++++++++++-
 .../apache/kafka/test/MockProcessorContext.java |  2 +-
 .../apache/kafka/test/MockProcessorNode.java    | 49 ++++++++++++
 .../kafka/test/MockProcessorSupplier.java       | 38 +++++++--
 24 files changed, 458 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index d1d3b24..41e9160 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -31,7 +31,7 @@ public class ConsumerRecordTest {
         String key = "key";
         String value = "value";
 
-        ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value);
+        ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
         assertEquals(topic, record.topic());
         assertEquals(partition, record.partition());
         assertEquals(offset, record.offset());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 8bac3e3..815b5b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -124,30 +124,39 @@ public interface ProcessorContext {
     void commit();
 
     /**
-     * Returns the topic name of the current input record
+     * Returns the topic name of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record
+     * Returns the partition id of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record
+     * Returns the offset of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the timestamp of the current input record. The timestamp is extracted from
+     * Returns the current timestamp.
+     *
+     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
+     * If it is triggered while processing a record generated not from the source processor (for example,
+     * if this method is invoked from the punctuate call), timestamp is defined as the current
+     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     *
      * @return the timestamp
      */
     long timestamp();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 3d8f792..ec89d47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -135,17 +135,14 @@ public class PartitionGroup {
      * partition timestamp among all its partitions
      */
     public long timestamp() {
-        if (queuesByTime.isEmpty()) {
-            // if there is no data in all partitions, return the smallest of their last known times
-            long timestamp = Long.MAX_VALUE;
-            for (RecordQueue queue : partitionQueues.values()) {
-                if (timestamp > queue.timestamp())
-                    timestamp = queue.timestamp();
-            }
-            return timestamp;
-        } else {
-            return queuesByTime.peek().timestamp();
+        // we should always return the smallest timestamp of all partitions
+        // to avoid group partition time goes backward
+        long timestamp = Long.MAX_VALUE;
+        for (RecordQueue queue : partitionQueues.values()) {
+            if (timestamp > queue.timestamp())
+                timestamp = queue.timestamp();
         }
+        return timestamp;
     }
 
     public int numBuffered(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 5bda856..1c398ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -30,6 +30,8 @@ import java.io.File;
 
 public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
+    public static final String NONEXIST_TOPIC = "__null_topic__";
+
     private final TaskId id;
     private final StreamTask task;
     private final StreamsMetrics metrics;
@@ -118,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         if (node == null)
             throw new TopologyBuilderException("Accessing from an unknown node");
 
-        // TODO: restore this once we fix the ValueGetter initialiation issue
+        // TODO: restore this once we fix the ValueGetter initialization issue
         //if (!node.stateStores.contains(name))
         //    throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
 
@@ -130,7 +132,12 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         if (task.record() == null)
             throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
 
-        return task.record().topic();
+        String topic = task.record().topic();
+
+        if (topic.equals(NONEXIST_TOPIC))
+            return null;
+        else
+            return topic;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 6db83a1..50e3a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -48,7 +48,7 @@ public class ProcessorNode<K, V> {
         return name;
     }
 
-    public final Processor processor() {
+    public final Processor<K, V> processor() {
         return processor;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index dc9a50d..758cfb0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,7 +22,7 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
     final long interval;
 
     public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, System.currentTimeMillis(), interval);
+        this(node, 0, interval);
     }
 
     public PunctuationSchedule(ProcessorNode node, long time, long interval) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index d5a9683..ea008b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,37 +108,37 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
 
     @Override
     public StateStore getStateStore(String name) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("getStateStore() not supported.");
     }
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("topic() not supported.");
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("partition() not supported.");
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("offset() not supported.");
     }
 
     @Override
     public long timestamp() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("timestamp() not supported.");
     }
 
     @Override
     public <K, V> void forward(K key, V value) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("forward() not supported.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("forward() not supported.");
     }
 
     @Override
@@ -148,11 +148,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
 
     @Override
     public void commit() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("commit() not supported.");
     }
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("schedule() not supported.");
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a484980..53d0a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -43,6 +43,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
 
+    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
+
     private final int maxBufferedSize;
 
     private final PartitionGroup partitionGroup;
@@ -202,11 +204,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     /**
      * Possibly trigger registered punctuation functions if
-     * current time has reached the defined stamp
-     *
-     * @param timestamp
+     * current partition group timestamp has reached the defined stamp
      */
-    public boolean maybePunctuate(long timestamp) {
+    public boolean maybePunctuate() {
+        long timestamp = partitionGroup.timestamp();
+
         return punctuationQueue.mayPunctuate(timestamp, this);
     }
 
@@ -216,10 +218,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
             throw new IllegalStateException("Current node is not null");
 
         currNode = node;
+        currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
+
         try {
             node.processor().punctuate(timestamp);
         } finally {
             currNode = null;
+            currRecord = null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c2a8e06..38dc356 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -341,8 +341,9 @@ public class StreamThread extends Thread {
 
             totalNumBuffered = 0;
 
+            // try to process one fetch record from each task via the topology, and also trigger punctuate
+            // functions if necessary, which may result in more records going through the topology in this loop
             if (!activeTasks.isEmpty()) {
-                // try to process one record from each task
                 for (StreamTask task : activeTasks.values()) {
                     long startProcess = time.milliseconds();
 
@@ -431,7 +432,9 @@ public class StreamThread extends Thread {
         try {
             long now = time.milliseconds();
 
-            if (task.maybePunctuate(now))
+            // check whether we should punctuate based on the task's partition group timestamp;
+            // which are essentially based on record timestamp.
+            if (task.maybePunctuate())
                 sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
         } catch (KafkaException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index d24ab15..19a9411 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -90,7 +90,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -102,7 +102,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all four items to the primary stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -114,7 +114,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all items to the other stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -126,7 +126,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push all four items to the primary stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -138,7 +138,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
             // push two items to the other stream. this should produce six item.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -150,7 +150,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
 
         } finally {
             Utils.delete(baseDir);
@@ -195,7 +195,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all four items to the primary stream. this should produce four items.
             // w1 = { 0:X0, 1:X1 }
@@ -219,7 +219,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -231,7 +231,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push all four items to the primary stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -243,7 +243,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
             // push two items to the other stream. this should produce six item.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -255,7 +255,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
 
         } finally {
             Utils.delete(baseDir);
@@ -302,7 +302,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -314,7 +314,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // clear logically
             time = 1000L;
@@ -323,7 +323,7 @@ public class KStreamKStreamJoinTest {
                 driver.setTime(time + i);
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expires items in w1
             // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -335,35 +335,35 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("3:X3+YY3");
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // go back to the time before expiration
 
@@ -373,35 +373,35 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0");
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // clear (logically)
             time = 2000L;
@@ -411,7 +411,7 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expires items in w2
             // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -422,35 +422,35 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("3:XX3+Y3");
+            processor.checkAndClearProcessResult("3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // go back to the time before expiration
 
@@ -460,35 +460,35 @@ public class KStreamKStreamJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0");
+            processor.checkAndClearProcessResult("0:XX0+Y0");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 166e8ba..65226d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -88,7 +88,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce two items.
             // w {}
@@ -98,7 +98,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
             // w = { 0:Y0, 1:Y1 }
@@ -108,7 +108,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should produce no items.
             // w = { 0:Y0, 1:Y1 }
@@ -118,7 +118,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
             // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
@@ -128,7 +128,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
         } finally {
             Utils.delete(baseDir);
@@ -173,7 +173,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce no items.
             // w = {}
@@ -183,7 +183,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // clear logically
             time = 1000L;
@@ -196,7 +196,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expire items in window.
             // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -207,35 +207,35 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             // go back to the time before expiration
 
@@ -245,35 +245,35 @@ public class KStreamKStreamLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 8e672a2..3acb59a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -95,7 +95,7 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should not produce any item.
 
@@ -103,7 +103,7 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -111,14 +111,14 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should not produce any item
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -126,7 +126,7 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push two items with null to the other stream as deletes. this should not produce any item.
 
@@ -134,7 +134,7 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -142,7 +142,7 @@ public class KStreamKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index e19510f..3c7a1bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -182,15 +182,15 @@ public class KStreamWindowAggregateTest {
             driver.setTime(4L);
             driver.process(topic1, "A", "1");
 
-            proc1.checkAndClearResult(
+            proc1.checkAndClearProcessResult(
                     "[A@0]:0+1",
                     "[B@0]:0+2",
                     "[C@0]:0+3",
                     "[D@0]:0+4",
                     "[A@0]:0+1+1"
             );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
                     "[A@0]:null",
                     "[B@0]:null",
                     "[C@0]:null",
@@ -209,15 +209,15 @@ public class KStreamWindowAggregateTest {
             driver.setTime(9L);
             driver.process(topic1, "C", "3");
 
-            proc1.checkAndClearResult(
+            proc1.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1", "[A@5]:0+1",
                     "[B@0]:0+2+2", "[B@5]:0+2",
                     "[D@0]:0+4+4", "[D@5]:0+4",
                     "[B@0]:0+2+2+2", "[B@5]:0+2+2",
                     "[C@0]:0+3+3", "[C@5]:0+3"
             );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
                     "[A@0]:null", "[A@5]:null",
                     "[B@0]:null", "[B@5]:null",
                     "[D@0]:null", "[D@5]:null",
@@ -236,15 +236,15 @@ public class KStreamWindowAggregateTest {
             driver.setTime(4L);
             driver.process(topic2, "A", "a");
 
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
                     "[A@0]:0+a",
                     "[B@0]:0+b",
                     "[C@0]:0+c",
                     "[D@0]:0+d",
                     "[A@0]:0+a+a"
             );
-            proc3.checkAndClearResult(
+            proc3.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1%0+a",
                     "[B@0]:0+2+2+2%0+b",
                     "[C@0]:0+3+3%0+c",
@@ -262,15 +262,15 @@ public class KStreamWindowAggregateTest {
             driver.setTime(9L);
             driver.process(topic2, "C", "c");
 
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
                     "[A@0]:0+a+a+a", "[A@5]:0+a",
                     "[B@0]:0+b+b", "[B@5]:0+b",
                     "[D@0]:0+d+d", "[D@5]:0+d",
                     "[B@0]:0+b+b+b", "[B@5]:0+b+b",
                     "[C@0]:0+c+c", "[C@5]:0+c"
             );
-            proc3.checkAndClearResult(
+            proc3.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
                     "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
                     "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 78d274e..ee26058 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -74,8 +74,8 @@ public class KTableFilterTest {
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
 
-        proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
-        proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
     }
 
     @Test
@@ -193,25 +193,25 @@ public class KTableFilterTest {
             driver.process(topic1, "B", 1);
             driver.process(topic1, "C", 1);
 
-            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
             driver.process(topic1, "A", 2);
             driver.process(topic1, "B", 2);
 
-            proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
-            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", 3);
 
-            proc1.checkAndClearResult("A:(3<-null)");
-            proc2.checkAndClearResult("A:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(3<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -250,25 +250,25 @@ public class KTableFilterTest {
             driver.process(topic1, "B", 1);
             driver.process(topic1, "C", 1);
 
-            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
             driver.process(topic1, "A", 2);
             driver.process(topic1, "B", 2);
 
-            proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
-            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", 3);
 
-            proc1.checkAndClearResult("A:(3<-2)");
-            proc2.checkAndClearResult("A:(null<-2)");
+            proc1.checkAndClearProcessResult("A:(3<-2)");
+            proc2.checkAndClearProcessResult("A:(null<-2)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)");
+            proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
 
         } finally {
             Utils.delete(stateDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 5f30574..f6ebbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -100,7 +100,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:null", "1:null");
+            processor.checkAndClearProcessResult("0:null", "1:null");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:null", "1:null");
+            processor.checkAndClearProcessResult("0:null", "1:null");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -195,7 +195,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -203,7 +203,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -211,14 +211,14 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -226,7 +226,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -234,7 +234,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -242,7 +242,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -285,7 +285,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -293,7 +293,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -301,14 +301,14 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -316,7 +316,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -324,7 +324,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -332,7 +332,7 @@ public class KTableKTableJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index f92c5ca..449ea05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -105,7 +105,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -114,7 +114,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -123,7 +123,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
             // push all items to the other stream. this should produce four items.
@@ -131,7 +131,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -140,7 +140,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -149,7 +149,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -158,7 +158,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -200,7 +200,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -208,7 +208,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -216,14 +216,14 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -231,7 +231,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -239,7 +239,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -247,7 +247,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -290,7 +290,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -298,7 +298,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -306,14 +306,14 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -321,7 +321,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -329,7 +329,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -337,7 +337,7 @@ public class KTableKTableLeftJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 6cc77e0..ea7476a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -100,7 +100,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
             // push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
             // push middle two items to the primary stream with null. this should produce two items.
@@ -162,7 +162,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("1:null", "2:null+YY2");
+            processor.checkAndClearProcessResult("1:null", "2:null+YY2");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -204,7 +204,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -212,7 +212,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -220,14 +220,14 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -235,7 +235,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -243,7 +243,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -251,7 +251,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
             // push middle two items to the primary stream with null. this should produce two items.
 
@@ -259,7 +259,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)");
+            proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -302,7 +302,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -310,7 +310,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -318,14 +318,14 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -333,7 +333,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -341,7 +341,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -349,7 +349,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
             // push middle two items to the primary stream with null. this should produce two items.
 
@@ -357,7 +357,7 @@ public class KTableKTableOuterJoinTest {
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+            proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9ec1258..9cafe8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -223,20 +223,20 @@ public class KTableMapValuesTest {
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", "03");
 
-            proc.checkAndClearResult("A:(3<-null)");
+            proc.checkAndClearProcessResult("A:(3<-null)");
 
             driver.process(topic1, "A", null);
 
-            proc.checkAndClearResult("A:(null<-null)");
+            proc.checkAndClearProcessResult("A:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -276,20 +276,20 @@ public class KTableMapValuesTest {
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
             driver.process(topic1, "A", "03");
 
-            proc.checkAndClearResult("A:(3<-2)");
+            proc.checkAndClearProcessResult("A:(3<-2)");
 
             driver.process(topic1, "A", null);
 
-            proc.checkAndClearResult("A:(null<-3)");
+            proc.checkAndClearProcessResult("A:(null<-3)");
 
         } finally {
             Utils.delete(stateDir);