You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/27 21:03:27 UTC

[kafka] branch trunk updated: KAFKA-6473: Add MockProcessorContext to public test-utils (#4736)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new adbf31a  KAFKA-6473: Add MockProcessorContext to public test-utils (#4736)
adbf31a is described below

commit adbf31ab1d27cda0b62d611e6141d830736a8e2e
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Mar 27 16:03:24 2018 -0500

    KAFKA-6473: Add MockProcessorContext to public test-utils (#4736)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 build.gradle                                       |   4 +
 docs/streams/developer-guide/dsl-api.html          |   5 +
 docs/streams/developer-guide/processor-api.html    |  30 +-
 docs/streams/developer-guide/testing.html          | 328 ++++++++++----
 docs/streams/developer-guide/write-streams.html    |   6 +
 .../examples/wordcount/WordCountProcessorDemo.java |   2 +-
 .../examples/wordcount/WordCountProcessorTest.java |  70 +++
 .../kstream/internals/KStreamKStreamJoinTest.java  |   4 +-
 .../internals/KStreamKStreamLeftJoinTest.java      |   4 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   6 +-
 .../internals/KStreamWindowAggregateTest.java      |   4 +-
 .../processor/internals/AbstractTaskTest.java      |   4 +-
 .../internals/GlobalStateManagerImplTest.java      |  10 +-
 .../processor/internals/ProcessorNodeTest.java     |   4 +-
 .../processor/internals/RecordQueueTest.java       |   4 +-
 .../streams/processor/internals/SinkNodeTest.java  |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java     |   6 +-
 .../state/internals/AbstractKeyValueStoreTest.java |   6 +-
 .../state/internals/CachingKeyValueStoreTest.java  |   6 +-
 .../state/internals/CachingSessionStoreTest.java   |   6 +-
 .../state/internals/CachingWindowStoreTest.java    |   6 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   6 +-
 .../state/internals/MeteredWindowStoreTest.java    |   6 +-
 .../RocksDBKeyValueStoreSupplierTest.java          |   8 +-
 .../internals/RocksDBSegmentedBytesStoreTest.java  |   6 +-
 .../internals/RocksDBSessionStoreSupplierTest.java |   4 +-
 .../state/internals/RocksDBSessionStoreTest.java   |   6 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  10 +-
 .../internals/RocksDBWindowStoreSupplierTest.java  |   8 +-
 .../state/internals/RocksDBWindowStoreTest.java    |   8 +-
 .../state/internals/SegmentIteratorTest.java       |   6 +-
 .../streams/state/internals/SegmentsTest.java      |   6 +-
 .../state/internals/StateStoreTestUtils.java       |   4 +-
 .../state/internals/StoreChangeLoggerTest.java     |   4 +-
 ...text.java => InternalMockProcessorContext.java} |  40 +-
 .../org/apache/kafka/test/KStreamTestDriver.java   |   6 +-
 .../streams/processor/MockProcessorContext.java    | 478 +++++++++++++++++++++
 .../kafka/streams/MockProcessorContextTest.java    | 406 +++++++++++++++++
 38 files changed, 1328 insertions(+), 203 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8de03ef..d5fd7d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1002,6 +1002,10 @@ project(':streams:examples') {
     compile project(':streams')
     compile project(':connect:json')  // this dependency should be removed after we unify data API
     compile libs.slf4jlog4j
+
+    testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest
+    testCompile project(':streams:test-utils')
+    testCompile libs.junit
   }
 
   javadoc {
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 34ac89f..8552bcc 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -66,6 +66,7 @@
                 </ul>
                 </li>
                 <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
+                <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
             </ul>
         </div>
         <div class="section" id="overview">
@@ -3154,6 +3155,10 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
                     retry on delivery failure or to prevent message duplication).</p>
 </div>
 </div>
+        <div class="section" id="testing-a-streams-app">
+            <a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
+            Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
+        </div>
 </div>
 
 
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index b51bc22..e3432b7 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -41,13 +41,16 @@
             <p class="topic-title first"><b>Table of Contents</b></p>
             <ul class="simple">
                 <li><a class="reference internal" href="#overview" id="id1">Overview</a></li>
-                <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream Processor</a></li>
-                <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a><ul>
-                    <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
-                    <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
-                    <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
-                    <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
-                </ul>
+                <li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream
+                    Processor</a></li>
+                <li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li>
+                <li><a class="reference internal" href="#state-stores" id="id3">State Stores</a>
+                    <ul>
+                        <li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
+                        <li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
+                        <li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
+                        <li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
+                    </ul>
                 </li>
                 <li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li>
             </ul>
@@ -98,11 +101,12 @@
                 callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
                 times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
             <div class="admonition attention">
-                <p class="first admonition-title">Attention</p>
+                <p class="first admonition-title"><b>Attention</b></p>
                 <p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
                     If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
                     This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
             </div>
+            <p><b>Example</b></p>
             <p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
             <ul class="simple">
                 <li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name &#8220;Counts&#8221;.</li>
@@ -159,6 +163,16 @@
                     arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p>
             </div>
         </div>
+        <div class="section" id="unit-testing-processors">
+            <h2>
+                <a class="toc-backref" href="#id9">Unit Testing Processors</a>
+                <a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a>
+            </h2>
+            <p>
+                Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your
+                processors <a href="testing.html#unit-testing-processors">here</a>.
+            </p>
+        </div>
         <div class="section" id="state-stores">
             <span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2>
             <p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index e6886a1..ea2ae98 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -18,26 +18,40 @@
 <script><!--#include virtual="../../js/templateData.js" --></script>
 
 <script id="content-template" type="text/x-handlebars-template">
-  <!-- h1>Developer Guide for Kafka Streams</h1 -->
-  <div class="sub-nav-sticky">
-    <div class="sticky-top">
-      <!-- div style="height:35px">
-        <a href="/{{version}}/documentation/streams/">Introduction</a>
-        <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
-        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
-        <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
-        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
-      </div -->
+    <!-- h1>Developer Guide for Kafka Streams</h1 -->
+    <div class="sub-nav-sticky">
+        <div class="sticky-top">
+            <!-- div style="height:35px">
+              <a href="/{{version}}/documentation/streams/">Introduction</a>
+              <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+              <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+              <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+            </div -->
+        </div>
     </div>
-  </div>
-
-  <div class="section" id="testing">
-    <span id="streams-developer-guide-testing"></span><h1>Testing a Streams Application<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
-    <p>
-      To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
-      Example <code>pom.xml</code> snippet when using Maven:
-    </p>
-    <pre>
+
+    <div class="section" id="testing">
+        <span id="streams-developer-guide-testing"></span>
+        <h1>Testing Kafka Streams<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
+        <div class="contents local topic" id="table-of-contents">
+            <p class="topic-title first"><b>Table of Contents</b></p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#test-utils-artifact">Importing the test utilities</a></li>
+                <li><a class="reference internal" href="#testing-topologytestdriver">Testing Streams applications</a>
+                </li>
+                <li><a class="reference internal" href="#unit-testing-processors">Unit testing Processors</a>
+                </li>
+            </ul>
+        </div>
+        <div class="section" id="test-utils-artifact">
+            <h2><a class="toc-backref" href="#test-utils-artifact" title="Permalink to this headline">Importing the test
+                utilities</a></h2>
+            <p>
+                To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
+                dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
+            </p>
+            <pre>
 &lt;dependency&gt;
     &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
     &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
@@ -45,13 +59,21 @@
     &lt;scope&gt;test&lt;/scope&gt;
 &lt;/dependency&gt;
     </pre>
-    <p>
-    The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a <code>Topology</code> that is either assembled manually
-    using Processor API or via the DSL using <code>StreamsBuilder</code>.
-    The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology.
-    You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records.
-    The test driver captures the results records and allows to query its embedded state stores.
-    <pre>
+        </div>
+        <div class="section" id="testing-topologytestdriver">
+            <h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a
+                Streams application</a></h2>
+
+            <p>
+                The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a
+                <code>Topology</code> that is either assembled manually
+                using Processor API or via the DSL using <code>StreamsBuilder</code>.
+                The test driver simulates the library runtime that continuously fetches records from input topics and
+                processes them by traversing the topology.
+                You can use the test driver to verify that your specified processor topology computes the correct result
+                with the manually piped in data records.
+                The test driver captures the results records and allows to query its embedded state stores.
+            <pre>
 // Processor API
 Topology topology = new Topology();
 topology.addSource("sourceProcessor", "input-topic");
@@ -68,62 +90,66 @@ Properties config = new Properties();
 config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
-    </pre>
-    <p>
-    The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
-    Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code> to generate those records
-    by providing regular Java types for key and values and the corresponding serializers.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
+                Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code>
+                to generate those records
+                by providing regular Java types for key and values and the corresponding serializers.
+            </p>
+            <pre>
 ConsumerRecordFactory&lt;String, Integer&gt; factory = new ConsumerRecordFactory&lt;&gt;("input-topic", new StringSerializer(), new IntegerSerializer());
 testDriver.pipe(factory.create("key", 42L));
-    </pre>
-    <p>
-    To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
-    For result verification, you can specify corresponding deserializers when reading the output record from the driver.
-    <pre>
+        </pre>
+            <p>
+                To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type
+                <code>byte[]</code>.
+                For result verification, you can specify corresponding deserializers when reading the output record from
+                the driver.
+            <pre>
 ProducerRecord&lt;String, Integer&gt; outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
-    </pre>
-    <p>
-    For result verification, you can use <code>OutputVerifier</code>.
-    It offers helper methods to compare only certain parts of the result record:
-    for example, you might only care about the key and value, but not the timestamp of the result record.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                For result verification, you can use <code>OutputVerifier</code>.
+                It offers helper methods to compare only certain parts of the result record:
+                for example, you might only care about the key and value, but not the timestamp of the result record.
+            </p>
+            <pre>
 OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match
-    </pre>
-    <p>
-    <code>TopologyTestDriver</code> supports punctuations, too.
-    Event-time punctuations are triggered automatically based on the processed records' timestamps.
-    Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
-    </p>
-    <pre>
+        </pre>
+            <p>
+                <code>TopologyTestDriver</code> supports punctuations, too.
+                Event-time punctuations are triggered automatically based on the processed records' timestamps.
+                Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the
+                driver mocks wall-clock-time internally to give users control over it).
+            </p>
+            <pre>
 testDriver.advanceWallClockTime(20L);
-    </pre>
-    </div>
-    <p>
-    Additionally, you can access state stores via the test driver before or after a test.
-    Accessing stores before a test is useful to pre-populate a store with some initial values.
-    After data was processed, expected updates to the store can be verified.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                Additionally, you can access state stores via the test driver before or after a test.
+                Accessing stores before a test is useful to pre-populate a store with some initial values.
+                After data was processed, expected updates to the store can be verified.
+            </p>
+            <pre>
 KeyValueStore store = testDriver.getKeyValueStore("store-name");
-    </pre>
-    <p>
-    Note, that you should always close the test driver at the end to make sure all resources are release properly.
-    </p>
-    <pre>
+        </pre>
+            <p>
+                Note, that you should always close the test driver at the end to make sure all resources are release
+                properly.
+            </p>
+            <pre>
 testDriver.close();
-    </pre>
-
-    <h2>Example</h2>
-    <p>
-    The following example demonstrates how to use the test driver and helper classes.
-    The example creates a topology that computes the maximum value per key using a key-value-store.
-    While processing, no output is generated, but only the store is updated.
-    Output is only sent downstream based on event-time and wall-clock punctuations.
-    </p>
-    <pre>
+        </pre>
+
+            <h3>Example</h3>
+            <p>
+                The following example demonstrates how to use the test driver and helper classes.
+                The example creates a topology that computes the maximum value per key using a key-value-store.
+                While processing, no output is generated, but only the store is updated.
+                Output is only sent downstream based on event-time and wall-clock punctuations.
+            </p>
+            <pre>
 private TopologyTestDriver testDriver;
 private KeyValueStore&lt;String, Long&gt; store;
 
@@ -266,31 +292,147 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
     @Override
     public void close() {}
 }
-    </pre>
-  <div class="pagination">
-  <div class="pagination">
-    <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
-    <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
-  </div>
+        </pre>
+        </div>
+        <div class="section" id="unit-testing-processors">
+            <h2>
+                <a class="headerlink" href="#unit-testing-processors"
+                   title="Permalink to this headline">Unit Testing Processors</a>
+            </h2>
+            <p>
+                If you <a href="processor-api.html">write a Processor</a>, you will want to test it.
+            </p>
+            <p>
+                Because the <code>Processor</code> forwards its results to the context rather than returning them,
+                Unit testing requires a mocked context capable of capturing forwarded data for inspection.
+                For this reason, we provide a <code>MockProcessorContext</code> in <a href="#test-utils-artifact"><code>test-utils</code></a>.
+            </p>
+            <b>Construction</b>
+            <p>
+                To begin with, instantiate your processor and initialize it with the mock context:
+            <pre>
+final Processor processorUnderTest = ...;
+final MockProcessorContext context = new MockProcessorContext();
+processorUnderTest.init(context);
+                </pre>
+            If you need to pass configuration to your processor or set the default serdes, you can create the mock with
+            config:
+            <pre>
+final Properties config = new Properties();
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
+config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+config.put("some.other.config", "some config value");
+final MockProcessorContext context = new MockProcessorContext(config);
+                </pre>
+            </p>
+            <b>Captured data</b>
+            <p>
+                The mock will capture any values that your processor forwards. You can make assertions on them:
+            <pre>
+processorUnderTest.process("key", "value");
+
+final Iterator&lt;CapturedForward&gt; forwarded = context.forwarded().iterator();
+assertEquals(forwarded.next().keyValue(), new KeyValue&lt;&gt;(..., ...));
+assertFalse(forwarded.hasNext());
+
+// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
+context.resetForwards();
+
+assertEquals(context.forwarded().size(), 0);
+            </pre>
+            If your processor forwards to specific child processors, you can query the context for captured data by
+            child name:
+            <pre>
+final List&lt;CapturedForward&gt; captures = context.forwarded("childProcessorName");
+            </pre>
+            The mock also captures whether your processor has called <code>commit()</code> on the context:
+            <pre>
+assertTrue(context.committed());
+
+// commit captures can also be reset.
+context.resetCommit();
+
+assertFalse(context.committed());
+            </pre>
+            </p>
+            <b>Setting record metadata</b>
+            <p>
+                In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp),
+                you can set them on the context, either all together or individually:
+            <pre>
+context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
+context.setTopic("topicName");
+context.setPartition(0);
+context.setOffset(0L);
+context.setTimestamp(0L);
+                </pre>
+            Once these are set, the context will continue returning the same values, until you set new ones.
+            </p>
+            <b>State stores</b>
+            <p>
+                In case your punctuator is stateful, the mock context allows you to register state stores.
+                You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or
+                Session), since the mock context does <i>not</i> manage changelogs, state directories, etc.
+            </p>
+            <pre>
+final KeyValueStore&lt;String, Integer&gt; store =
+    Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("myStore"),
+            Serdes.String(),
+            Serdes.Integer()
+        )
+        .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+        .build();
+store.init(context, store);
+context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
+            </pre>
+            <b>Verifying punctuators</b>
+            <p>
+                Processors can schedule punctuators to handle periodic tasks.
+                The mock context does <i>not</i> automatically execute punctuators, but it does capture them to
+                allow you to unit test them as well:
+            <pre>
+final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+final long interval = capturedPunctuator.getIntervalMs();
+final PunctuationType type = capturedPunctuator.getType();
+final boolean cancelled = capturedPunctuator.cancelled();
+final Punctuator punctuator = capturedPunctuator.getPunctuator();
+punctuator.punctuate(/*timestamp*/ 0L);
+                </pre>
+            If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a
+            simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>.
+            </p>
+        </div>
+    </div>
+    <div class="pagination">
+        <div class="pagination">
+            <a href="/{{version}}/documentation/streams/developer-guide/datatypes"
+               class="pagination__btn pagination__btn__prev">Previous</a>
+            <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
+               class="pagination__btn pagination__btn__next">Next</a>
+        </div>
+    </div>
 </script>
 
 <!--#include virtual="../../../includes/_header.htm" -->
 <!--#include virtual="../../../includes/_top.htm" -->
 <div class="content documentation documentation--current">
-  <!--#include virtual="../../../includes/_nav.htm" -->
-  <div class="right">
-    <!--#include virtual="../../../includes/_docs_banner.htm" -->
-    <ul class="breadcrumbs">
-      <li><a href="/documentation">Documentation</a></li>
-      <li><a href="/documentation/streams">Kafka Streams</a></li>
-      <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
-    </ul>
-    <div class="p-content"></div>
-  </div>
+    <!--#include virtual="../../../includes/_nav.htm" -->
+    <div class="right">
+        <!--#include virtual="../../../includes/_docs_banner.htm" -->
+        <ul class="breadcrumbs">
+            <li><a href="/documentation">Documentation</a></li>
+            <li><a href="/documentation/streams">Kafka Streams</a></li>
+            <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
+        </ul>
+        <div class="p-content"></div>
+    </div>
 </div>
 <!--#include virtual="../../../includes/_footer.htm" -->
 <script>
-    $(function() {
+    $(function () {
         // Show selected style on nav item
         $('.b-nav__streams').addClass('selected');
 
@@ -299,7 +441,7 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
             y_pos = $navbar.offset().top,
             height = $navbar.height();
 
-        $(window).scroll(function() {
+        $(window).scroll(function () {
             var scrollTop = $(window).scrollTop();
 
             if (scrollTop > y_pos - height) {
diff --git a/docs/streams/developer-guide/write-streams.html b/docs/streams/developer-guide/write-streams.html
index 1e4213d..44cdb3f 100644
--- a/docs/streams/developer-guide/write-streams.html
+++ b/docs/streams/developer-guide/write-streams.html
@@ -37,6 +37,7 @@
       <ul class="simple">
           <li><a class="reference internal" href="#libraries-and-maven-artifacts" id="id1">Libraries and Maven artifacts</a></li>
           <li><a class="reference internal" href="#using-kafka-streams-within-your-application-code" id="id2">Using Kafka Streams within your application code</a></li>
+          <li><a class="reference internal" href="#testing-a-streams-app" id="id3">Testing a Streams application</a></li>
       </ul>
     <p>Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application.
       The computational logic of a Kafka Streams application is defined as a <a class="reference internal" href="../concepts.html#streams-concepts"><span class="std std-ref">processor topology</span></a>,
@@ -196,6 +197,11 @@
       <p>After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining
         instances.</p>
 </div>
+
+      <div class="section" id="testing-a-streams-app">
+          <a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
+          Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
+      </div>
 </div>
 
 
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index cfa2137..dbf2b70 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -49,7 +49,7 @@ import java.util.concurrent.CountDownLatch;
  */
 public class WordCountProcessorDemo {
 
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
 
         @Override
         public Processor<String, String> get() {
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
new file mode 100644
index 0000000..566b7d4
--- /dev/null
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
+ */
+public class WordCountProcessorTest {
+    @Test
+    public void test() {
+        final MockProcessorContext context = new MockProcessorContext();
+
+        // Create, initialize, and register the state store.
+        final KeyValueStore<String, Integer> store =
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.Integer())
+                .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+                .build();
+        store.init(context, store);
+        context.register(store, false, null);
+
+        // Create and initialize the processor under test
+        final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
+        processor.init(context);
+
+        // send a record to the processor
+        processor.process("key", "alpha beta gamma alpha");
+
+        // note that the processor commits, but does not forward, during process()
+        assertTrue(context.committed());
+        assertTrue(context.forwarded().isEmpty());
+
+        // now, we trigger the punctuator, which iterates over the state store and forwards the contents.
+        context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
+
+        // finally, we can verify the output.
+        final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
+        assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
+        assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
+        assertFalse(capturedForwards.hasNext());
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 572c0b0..5600023 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -719,6 +719,6 @@ public class KStreamKStreamJoinTest {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 39b318f..465082b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -304,6 +304,6 @@ public class KStreamKStreamLeftJoinTest {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 21dc4f0..212c48d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -81,13 +81,13 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private final List<KeyValue> results = new ArrayList<>();
     private Processor<String, String> processor = sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
 
 
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(stateDir,
+        context = new InternalMockProcessorContext(stateDir,
             Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public <K, V> void forward(final K key, final V value) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index aa660e0..d3a74e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -143,7 +143,7 @@ public class KStreamWindowAggregateTest {
     }
 
     private void setRecordContext(final long time, final String topic) {
-        ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
+        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 4569858..347e9c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
@@ -194,7 +194,7 @@ public class AbstractTaskTest {
         testFile4.createNewFile();
         assertTrue(testFile4.exists());
 
-        task.processorContext = new MockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
+        task.processorContext = new InternalMockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
 
         task.stateMgr.register(store1, new MockRestoreCallback());
         task.stateMgr.register(store2, new MockRestoreCallback());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index df8d201..d19e63e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.TestUtils;
@@ -86,7 +86,7 @@ public class GlobalStateManagerImplTest {
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private ProcessorTopology topology;
-    private MockProcessorContext mockProcessorContext;
+    private InternalMockProcessorContext processorContext;
 
     @Before
     public void before() throws IOException {
@@ -120,8 +120,8 @@ public class GlobalStateManagerImplTest {
             stateDirectory,
             stateRestoreListener,
             streamsConfig);
-        mockProcessorContext = new MockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
-        stateManager.setGlobalProcessorContext(mockProcessorContext);
+        processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
+        stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
     }
 
@@ -631,7 +631,7 @@ public class GlobalStateManagerImplTest {
         assertTrue(testFile4.exists());
 
         // only delete and recreate store 1 and 3 -- 2 and 4 must be untouched
-        stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), mockProcessorContext);
+        stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), processorContext);
 
         assertFalse(testFile1.exists());
         assertTrue(testFile2.exists());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 90ef771..0dea193 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -110,7 +110,7 @@ public class ProcessorNodeTest {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
         final Metrics metrics = new Metrics();
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
         final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 2fa1b59..faf72e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.After;
@@ -54,7 +54,7 @@ public class RecordQueueTest {
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
 
-    final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
+    final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
             new RecordCollectorImpl(null, null,  new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 6792740..4b48a17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +37,7 @@ import static org.junit.Assert.fail;
 public class SinkNodeTest {
     private final Serializer anySerializer = Serdes.Bytes().serializer();
     private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
-    private final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde,
         new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler()));
     private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index a342585..fc810e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
@@ -179,7 +179,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<>();
 
-    private final MockProcessorContext context;
+    private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
@@ -227,7 +227,7 @@ public class KeyValueStoreTestDriver<K, V> {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
-        context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+        context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
 
             @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 937b1d0..51c782a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,14 +48,14 @@ public abstract class AbstractKeyValueStoreTest {
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
 
-    protected MockProcessorContext context;
+    protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
     protected KeyValueStoreTestDriver<Integer, String> driver;
 
     @Before
     public void before() {
         driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        context = (MockProcessorContext) driver.context();
+        context = (InternalMockProcessorContext) driver.context();
         context.setTime(10);
         store = createKeyValueStore(context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 0e3b4e8..8705326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
 public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     private final int maxCacheSizeBytes = 150;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private CachingKeyValueStore<String, String> store;
     private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
     private ThreadCache cache;
@@ -73,7 +73,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
+        context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
         topic = "topic";
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
         store.init(context, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 16ef47c..a9a66e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -52,7 +52,7 @@ import static org.junit.Assert.assertFalse;
 public class CachingSessionStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 600;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingSessionStore<String, String> cachingStore;
     private ThreadCache cache;
@@ -75,7 +75,7 @@ public class CachingSessionStoreTest {
                                                  Segments.segmentInterval(retention, numSegments)
                                                  );
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
         cachingStore.init(context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index bbf9bef..c25655b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -56,7 +56,7 @@ public class CachingWindowStoreTest {
     private static final int MAX_CACHE_SIZE_BYTES = 150;
     private static final long DEFAULT_TIMESTAMP = 10L;
     private static final Long WINDOW_SIZE = 10000L;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
     private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
@@ -80,7 +80,7 @@ public class CachingWindowStoreTest {
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
-        context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));
         cachingStore.init(context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9360dae..7342c93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -41,7 +41,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class ChangeLoggingKeyValueBytesStoreTest {
 
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
     private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
     private final Map sent = new HashMap<>();
@@ -64,7 +64,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
                 sent.put(key, value);
             }
         };
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 59c7ade..4fd7f30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class MeteredWindowStoreTest {
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
     private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
@@ -98,7 +98,7 @@ public class MeteredWindowStoreTest {
 
         };
 
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 66cc9ad..098c326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -44,7 +44,7 @@ public class RocksDBKeyValueStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),
                                                                           new NoOpRecordCollector(),
@@ -73,7 +73,7 @@ public class RocksDBKeyValueStoreSupplierTest {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
@@ -100,7 +100,7 @@ public class RocksDBKeyValueStoreSupplierTest {
                 logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index e34d3cc..388a2fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -56,7 +56,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     private final long retention = 60000L;
     private final int numSegments = 3;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
@@ -71,7 +71,7 @@ public class RocksDBSegmentedBytesStoreTest {
                                                     schema);
 
         stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
             stateDir,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 19bfded..272e0b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -47,7 +47,7 @@ public class RocksDBSessionStoreSupplierTest {
     private static final String STORE_NAME = "name";
     private final List<ProducerRecord> logged = new ArrayList<>();
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
         Serdes.String(),
         Serdes.String(),
         new NoOpRecordCollector() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index b25d725..6495315 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -45,7 +45,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBSessionStoreTest {
 
     private SessionStore<String, Long> sessionStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
 
     @Before
     public void before() {
@@ -59,7 +59,7 @@ public class RocksDBSessionStoreTest {
                                                  Serdes.String(),
                                                  Serdes.Long());
 
-        context = new MockProcessorContext(TestUtils.tempDirectory(),
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a89dc60..a09d87d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -62,14 +62,14 @@ public class RocksDBStoreTest {
     private Serializer<String> stringSerializer = new StringSerializer();
     private Deserializer<String> stringDeserializer = new StringDeserializer();
     private RocksDBStore rocksDBStore;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private File dir;
 
     @Before
     public void setUp() {
         rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(dir,
+        context = new InternalMockProcessorContext(dir,
             Serdes.String(),
             Serdes.String(),
             new NoOpRecordCollector(),
@@ -115,7 +115,7 @@ public class RocksDBStoreTest {
         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
         configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         MockRocksDbConfigSetter.called = false;
-        rocksDBStore.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
+        rocksDBStore.openDB(new InternalMockProcessorContext(tempDir, new StreamsConfig(configs)));
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -123,7 +123,7 @@ public class RocksDBStoreTest {
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
         final File tmpDir = TestUtils.tempDirectory();
-        MockProcessorContext tmpContext = new MockProcessorContext(tmpDir,
+        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir,
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index bca6949..a6ccfdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -45,7 +45,7 @@ public class RocksDBWindowStoreSupplierTest {
     private static final String STORE_NAME = "name";
     private WindowStore<String, String> store;
     private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
-    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                           Serdes.String(),
                                                                           Serdes.String(),
                                                                           new NoOpRecordCollector(),
@@ -75,7 +75,7 @@ public class RocksDBWindowStoreSupplierTest {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
@@ -102,7 +102,7 @@ public class RocksDBWindowStoreSupplierTest {
                 logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
             }
         };
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
                                                                       Serdes.String(),
                                                                       Serdes.String(),
                                                                       collector,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c745e70..b3a60a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -37,7 +37,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -94,7 +94,7 @@ public class RocksDBWindowStoreTest {
     };
 
     private final File baseDir = TestUtils.tempDirectory("test");
-    private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
     private WindowStore<Integer, String> windowStore;
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
@@ -842,7 +842,7 @@ public class RocksDBWindowStoreTest {
         assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
     }
 
-    private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
+    private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final InternalMockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime));
         store.put(0, "zero");
         context.setRecordContext(createRecordContext(startTime + 1L));
@@ -855,7 +855,7 @@ public class RocksDBWindowStoreTest {
         store.put(5, "five");
     }
 
-    private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, MockProcessorContext context) {
+    private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, InternalMockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime + 3L));
         store.put(2, "two+1");
         context.setRecordContext(createRecordContext(startTime + 4L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 9c150c5..d61218e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -48,12 +48,12 @@ public class SegmentIteratorTest {
         }
     };
 
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private SegmentIterator iterator = null;
 
     @Before
     public void before() {
-        context = new MockProcessorContext(
+        context = new InternalMockProcessorContext(
                 TestUtils.tempDirectory(),
                 Serdes.String(),
                 Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index deb26f7..ec59a00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class SegmentsTest {
 
     private static final int NUM_SEGMENTS = 5;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private Segments segments;
     private long segmentInterval;
     private File stateDirectory;
@@ -54,7 +54,7 @@ public class SegmentsTest {
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new MockProcessorContext(stateDirectory,
+        context = new InternalMockProcessorContext(stateDirectory,
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index d30372f..b1818c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 
 import java.util.Collections;
@@ -42,7 +42,7 @@ public class StateStoreTestUtils {
 
         final StateStore stateStore = supplier.get();
         stateStore.init(
-            new MockProcessorContext(
+            new InternalMockProcessorContext(
                 StateSerdes.withBuiltinTypes(
                     ProcessorStateManager.storeChangelogTopic(applicationId, name),
                     keyType,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 32b56bb..c62b09b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.After;
 import org.junit.Test;
 
@@ -39,7 +39,7 @@ public class StoreChangeLoggerTest {
 
     private final Map<Integer, String> logged = new HashMap<>();
 
-    private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
                 @Override
                 public <K1, V1> void send(final String topic,
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
similarity index 87%
rename from streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
rename to streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 6b0cb66..74bb5d1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -48,7 +48,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-public class MockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
+public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final File stateDir;
     private final Metrics metrics;
@@ -61,19 +61,19 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
     private Serde<?> valSerde;
     private long timestamp = -1L;
 
-    public MockProcessorContext(final File stateDir,
-                                final StreamsConfig config) {
+    public InternalMockProcessorContext(final File stateDir,
+                                        final StreamsConfig config) {
         this(stateDir, null, null, new Metrics(), config, null, null);
     }
 
-    public MockProcessorContext(final StateSerdes<?, ?> serdes,
-                                final RecordCollector collector) {
+    public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
+                                        final RecordCollector collector) {
         this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
     }
 
-    public MockProcessorContext(final StateSerdes<?, ?> serdes,
-                                final RecordCollector collector,
-                                final Metrics metrics) {
+    public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
+                                        final RecordCollector collector,
+                                        final Metrics metrics) {
         this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
             @Override
             public RecordCollector recordCollector() {
@@ -82,11 +82,11 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
         }, null);
     }
 
-    public MockProcessorContext(final File stateDir,
-                                final Serde<?> keySerde,
-                                final Serde<?> valSerde,
-                                final RecordCollector collector,
-                                final ThreadCache cache) {
+    public InternalMockProcessorContext(final File stateDir,
+                                        final Serde<?> keySerde,
+                                        final Serde<?> valSerde,
+                                        final RecordCollector collector,
+                                        final ThreadCache cache) {
         this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
             @Override
             public RecordCollector recordCollector() {
@@ -95,13 +95,13 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
         }, cache);
     }
 
-    private MockProcessorContext(final File stateDir,
-                                final Serde<?> keySerde,
-                                final Serde<?> valSerde,
-                                final Metrics metrics,
-                                final StreamsConfig config,
-                                final RecordCollector.Supplier collectorSupplier,
-                                final ThreadCache cache) {
+    private InternalMockProcessorContext(final File stateDir,
+                                         final Serde<?> keySerde,
+                                         final Serde<?> valSerde,
+                                         final Metrics metrics,
+                                         final StreamsConfig config,
+                                         final RecordCollector.Supplier collectorSupplier,
+                                         final ThreadCache cache) {
         super(new TaskId(0, 0),
               config,
               new MockStreamsMetrics(metrics),
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 3a9ed75..39183d9 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -48,7 +48,7 @@ public class KStreamTestDriver extends ExternalResource {
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
 
     private ProcessorTopology topology;
-    private MockProcessorContext context;
+    private InternalMockProcessorContext context;
     private ProcessorTopology globalTopology;
     private final LogContext logContext = new LogContext("testCache ");
 
@@ -85,7 +85,7 @@ public class KStreamTestDriver extends ExternalResource {
         topology = builder.build(null);
         globalTopology = builder.buildGlobalStateTopology();
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
@@ -126,7 +126,7 @@ public class KStreamTestDriver extends ExternalResource {
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
 
         // init global topology first as it will add stores to the
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
new file mode 100644
index 0000000..03f871a
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnessess.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+@InterfaceStability.Evolving
+public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private String topic;
+    private Integer partition;
+    private Long offset;
+    private Long timestamp;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static class CapturedPunctuator {
+        private final long intervalMs;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
+            this.intervalMs = intervalMs;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public long getIntervalMs() {
+            return intervalMs;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            this.cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+
+    public static class CapturedForward {
+        private final String childName;
+        private final long timestamp;
+        private final KeyValue keyValue;
+
+        private CapturedForward(final To to, final KeyValue keyValue) {
+            if (keyValue == null) throw new IllegalArgumentException();
+
+            this.childName = to.childName;
+            this.timestamp = to.timestamp;
+            this.keyValue = keyValue;
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return The child name, or {@code null} if it was broadcasted.
+         */
+        public String childName() {
+            return childName;
+        }
+
+        /**
+         * The timestamp attached to the forwarded record.
+         *
+         * @return A timestamp, or {@code -1} if none was forwarded.
+         */
+        public long timestamp() {
+            return timestamp;
+        }
+
+        /**
+         * The data forwarded.
+         *
+         * @return A key/value pair. Not null.
+         */
+        public KeyValue keyValue() {
+            return keyValue;
+        }
+    }
+
+    // contructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        //noinspection DoubleBraceInitialization
+        this(
+            new Properties() {
+                {
+                    put(StreamsConfig.APPLICATION_ID_CONFIG, "");
+                    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+                }
+            },
+            new TaskId(0, 0),
+            null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", new HashMap<String, String>());
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return stateDir;
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     * @param timestamp A record timestamp
+     */
+    public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.timestamp = timestamp;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param topic A topic name
+     */
+    public void setTopic(final String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param partition A partition number
+     */
+    public void setPartition(final int partition) {
+        this.partition = partition;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param offset A record offset
+     */
+    public void setOffset(final long offset) {
+        this.offset = offset;
+    }
+
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param timestamp A record timestamp
+     */
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String topic() {
+        if (topic == null) {
+            throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
+        }
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        if (partition == null) {
+            throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
+        }
+        return partition;
+    }
+
+    @Override
+    public long offset() {
+        if (offset == null) {
+            throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
+        }
+        return offset;
+    }
+
+    @Override
+    public long timestamp() {
+        if (timestamp == null) {
+            throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
+        }
+        return timestamp;
+    }
+
+    // mocks ================================================
+
+
+    @Override
+    public void register(final StateStore store,
+                         final boolean loggingEnabledIsDeprecatedAndIgnored,
+                         final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
+        stateStores.put(store.name(), store);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return stateStores.get(name);
+    }
+
+    @Override
+    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return new Cancellable() {
+            @Override
+            public void cancel() {
+                capturedPunctuator.cancel();
+            }
+        };
+    }
+
+    @Override
+    public void schedule(final long interval) {
+        throw new UnsupportedOperationException(
+            "schedule() is deprecated and not supported in Mock. " +
+                "Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead."
+        );
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
+        capturedPunctuators.addAll(punctuators);
+        return capturedPunctuators;
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        //noinspection unchecked
+        capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value)));
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        //noinspection unchecked
+        capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        throw new UnsupportedOperationException(
+            "Forwarding to a child by index is deprecated. " +
+                "Please transition processors to forward using a 'To' object instead."
+        );
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        throw new UnsupportedOperationException(
+            "Forwarding to a child by name is deprecated. " +
+                "Please transition processors to forward using 'To.child(childName)' instead."
+        );
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    public List<CapturedForward> forwarded() {
+        final LinkedList<CapturedForward> result = new LinkedList<>();
+        result.addAll(capturedForwards);
+        return result;
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    public List<CapturedForward> forwarded(final String childName) {
+        final LinkedList<CapturedForward> result = new LinkedList<>();
+        for (final CapturedForward capture : capturedForwards) {
+            if (capture.childName() == null || capture.childName().equals(childName)) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
+            "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+            "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
new file mode 100644
index 0000000..8c5ec46
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MockProcessorContextTest {
+    @Test
+    public void shouldCaptureOutputRecords() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                context().forward(key + value, key.length() + value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureOutputRecordsUsingTo() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                context().forward(key + value, key.length() + value, To.all());
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureRecordsOutputToChildByName() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            private int count = 0;
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (count == 0) {
+                    context().forward("start", -1L, To.all()); // broadcast
+                }
+                final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
+                context().forward(key + value, key.length() + value, toChild);
+                count++;
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+
+            final CapturedForward forward1 = forwarded.next();
+            assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
+            assertEquals(null, forward1.childName());
+
+            final CapturedForward forward2 = forwarded.next();
+            assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
+            assertEquals("george", forward2.childName());
+
+            final CapturedForward forward3 = forwarded.next();
+            assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
+            assertEquals("pete", forward3.childName());
+
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("george").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("pete").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward> forwarded = context.forwarded("steve").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection deprecation
+                context().forward(key, value, 0);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldThrowIfForwardedWithDeprecatedChildName() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection deprecation
+                context().forward(key, value, "child1");
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldCaptureCommitsAndAllowReset() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            private int count = 0;
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (++count > 2) context().commit();
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        assertFalse(context.committed());
+
+        processor.process("foobar", 500L);
+
+        assertTrue(context.committed());
+
+        context.resetCommit();
+
+        assertFalse(context.committed());
+    }
+
+    @Test
+    public void shouldStoreAndReturnStateStores() {
+        final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+            @Override
+            public void process(final String key, final Long value) {
+                //noinspection unchecked
+                final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
+                stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+                stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+        final KeyValueStore<String, Long> store = new InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long());
+        context.register(store, false, null);
+
+        store.init(context, store);
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("bar", 50L);
+
+        assertEquals(5L, (long) store.get("foo"));
+        assertEquals(50L, (long) store.get("bar"));
+        assertEquals(55L, (long) store.get("all"));
+    }
+
+    @Test
+    public void shouldCaptureApplicationAndRecordMetadata() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+
+        final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
+            @Override
+            public void process(final String key, final Object value) {
+                context().forward("appId", context().applicationId());
+                context().forward("taskId", context().taskId());
+
+                context().forward("topic", context().topic());
+                context().forward("partition", context().partition());
+                context().forward("offset", context().offset());
+                context().forward("timestamp", context().timestamp());
+
+                context().forward("key", key);
+                context().forward("value", value);
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext(config);
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an exception.");
+        } catch (final IllegalStateException expected) {
+            // expected, since the record metadata isn't initialized
+        }
+
+        context.resetForwards();
+        context.setRecordMetadata("t1", 0, 0L, 0L);
+
+        {
+            processor.process("foo", 5L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+
+        // record metadata should be "sticky"
+        context.setOffset(1L);
+        context.setTimestamp(10L);
+
+        {
+            processor.process("bar", 50L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+        // record metadata should be "sticky"
+        context.setTopic("t2");
+        context.setPartition(30);
+
+        {
+            processor.process("baz", 500L);
+            final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
+        }
+    }
+
+    @Test
+    public void shouldCapturePunctuator() {
+        final Processor<String, Long> processor = new Processor<String, Long>() {
+            @Override
+            public void init(final ProcessorContext context) {
+                context.schedule(
+                    1000L,
+                    PunctuationType.WALL_CLOCK_TIME,
+                    new Punctuator() {
+                        @Override
+                        public void punctuate(final long timestamp) {
+                            context.commit();
+                        }
+                    }
+                );
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+            }
+
+            @Override
+            public void punctuate(final long timestamp) {
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        processor.init(context);
+
+        final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+        assertEquals(1000L, capturedPunctuator.getIntervalMs());
+        assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
+        assertFalse(capturedPunctuator.cancelled());
+
+        final Punctuator punctuator = capturedPunctuator.getPunctuator();
+        assertFalse(context.committed());
+        punctuator.punctuate(1234L);
+        assertTrue(context.committed());
+    }
+
+    @Test
+    public void fullConstructorShouldSetAllExpectedAttributes() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+        final File dummyFile = new File("");
+        final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile);
+
+        assertEquals("testFullConstructor", context.applicationId());
+        assertEquals(new TaskId(1, 1), context.taskId());
+        assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
+        assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
+        assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
+        assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
+        assertEquals(dummyFile, context.stateDir());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.