You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 18:41:53 UTC

[1/2] kafka git commit: KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id

Repository: kafka
Updated Branches:
  refs/heads/trunk 9a836d015 -> 958e10c87


http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8ff72bc..b3b6537 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -37,7 +37,7 @@ import java.util.Set;
 
 public abstract class AbstractTask {
     protected final TaskId id;
-    protected final String jobId;
+    protected final String applicationId;
     protected final ProcessorTopology topology;
     protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
@@ -45,7 +45,7 @@ public abstract class AbstractTask {
     protected ProcessorContext processorContext;
 
     protected AbstractTask(TaskId id,
-                           String jobId,
+                           String applicationId,
                            Collection<TopicPartition> partitions,
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
@@ -53,17 +53,17 @@ public abstract class AbstractTask {
                            StreamsConfig config,
                            boolean isStandby) {
         this.id = id;
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
 
         // create the processor state manager
         try {
-            File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
-            File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
+            File applicationStateDir = StreamThread.makeStateDir(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+            File stateFile = new File(applicationStateDir.getCanonicalPath(), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
+            this.stateMgr = new ProcessorStateManager(applicationId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new ProcessorStateException("Error while creating the state manager", e);
         }
@@ -83,8 +83,8 @@ public abstract class AbstractTask {
         return id;
     }
 
-    public final String jobId() {
-        return jobId;
+    public final String applicationId() {
+        return applicationId;
     }
 
     public final Set<TopicPartition> partitions() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c4acc01..f6e43d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -27,15 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.TaskId;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 
 public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
-    private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
-
     private final TaskId id;
     private final StreamTask task;
     private final StreamsMetrics metrics;
@@ -84,8 +79,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public String jobId() {
-        return task.jobId();
+    public String applicationId() {
+        return task.applicationId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c8f289e..df8516c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -51,7 +51,7 @@ public class ProcessorStateManager {
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
     public static final String LOCK_FILE_NAME = ".lock";
 
-    private final String jobId;
+    private final String applicationId;
     private final int defaultPartition;
     private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
@@ -65,8 +65,8 @@ public class ProcessorStateManager {
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
 
-    public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
-        this.jobId = jobId;
+    public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+        this.applicationId = applicationId;
         this.defaultPartition = defaultPartition;
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
@@ -104,8 +104,8 @@ public class ProcessorStateManager {
         }
     }
 
-    public static String storeChangelogTopic(String jobId, String storeName) {
-        return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
+    public static String storeChangelogTopic(String applicationId, String storeName) {
+        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
     }
 
     public static FileLock lockStateDirectory(File stateDir) throws IOException {
@@ -154,7 +154,7 @@ public class ProcessorStateManager {
         // check that the underlying change log topic exist or not
         String topic;
         if (loggingEnabled)
-            topic = storeChangelogTopic(this.jobId, store.name());
+            topic = storeChangelogTopic(this.applicationId, store.name());
         else topic = store.name();
 
         // block until the partition is ready for this state changelog topic or time has elapsed
@@ -325,7 +325,7 @@ public class ProcessorStateManager {
                 for (String storeName : stores.keySet()) {
                     TopicPartition part;
                     if (loggingEnabled.contains(storeName))
-                        part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
+                        part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
                     else
                         part = new TopicPartition(storeName, getPartition(storeName));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 82633b4..0bcae18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -25,17 +25,13 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
 public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
-    private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
-
     private final TaskId id;
-    private final String jobId;
+    private final String applicationId;
     private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
@@ -47,12 +43,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private boolean initialized;
 
     public StandbyContextImpl(TaskId id,
-                              String jobId,
+                              String applicationId,
                               StreamsConfig config,
                               ProcessorStateManager stateMgr,
                               StreamsMetrics metrics) {
         this.id = id;
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.metrics = metrics;
         this.stateMgr = stateMgr;
 
@@ -78,8 +74,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public String jobId() {
-        return jobId;
+    public String applicationId() {
+        return applicationId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index da454cb..f19d5a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -36,15 +34,13 @@ import java.util.Map;
  */
 public class StandbyTask extends AbstractTask {
 
-    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
-
     private final Map<TopicPartition, Long> checkpointedOffsets;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
      *
      * @param id                    the ID of this task
-     * @param jobId                 the ID of the job
+     * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
@@ -53,17 +49,17 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
-                       String jobId,
+                       String applicationId,
                        Collection<TopicPartition> partitions,
                        ProcessorTopology topology,
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
                        StreamsConfig config,
                        StreamsMetrics metrics) {
-        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
-        this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics);
+        this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
 
         initializeStateStores();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 266df3e..a6b82af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
-        this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);
+        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
@@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
-        return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName));
+        return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName));
     }
 
     public Set<TaskId> tasksForPartition(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4d66324..54a25c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -61,7 +61,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * Create {@link StreamTask} with its assigned partitions
      *
      * @param id                    the ID of this task
-     * @param jobId                 the ID of the job
+     * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
@@ -71,7 +71,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StreamTask(TaskId id,
-                      String jobId,
+                      String applicationId,
                       Collection<TopicPartition> partitions,
                       ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
@@ -79,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamsConfig config,
                       StreamsMetrics metrics) {
-        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e9343e0..491c812 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -72,7 +72,7 @@ public class StreamThread extends Thread {
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
-    public final String jobId;
+    public final String applicationId;
     public final String clientId;
     public final UUID processId;
 
@@ -106,12 +106,12 @@ public class StreamThread extends Thread {
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
 
-    static File makeStateDir(String jobId, String baseDirName) {
+    static File makeStateDir(String applicationId, String baseDirName) {
         File baseDir = new File(baseDirName);
         if (!baseDir.exists())
             baseDir.mkdir();
 
-        File stateDir = new File(baseDir, jobId);
+        File stateDir = new File(baseDir, applicationId);
         if (!stateDir.exists())
             stateDir.mkdir();
 
@@ -150,12 +150,12 @@ public class StreamThread extends Thread {
 
     public StreamThread(TopologyBuilder builder,
                         StreamsConfig config,
-                        String jobId,
+                        String applicationId,
                         String clientId,
                         UUID processId,
                         Metrics metrics,
                         Time time) {
-        this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
+        this(builder, config, null , null, null, applicationId, clientId, processId, metrics, time);
     }
 
     StreamThread(TopologyBuilder builder,
@@ -163,17 +163,17 @@ public class StreamThread extends Thread {
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
-                 String jobId,
+                 String applicationId,
                  String clientId,
                  UUID processId,
                  Metrics metrics,
                  Time time) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics(jobId);
+        this.sourceTopics = builder.sourceTopics(applicationId);
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -194,7 +194,7 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         // read in task specific config values
-        this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        this.stateDir = makeStateDir(this.applicationId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -224,7 +224,7 @@ public class StreamThread extends Thread {
     private Consumer<byte[], byte[]> createConsumer() {
         String threadName = this.getName();
         log.info("Creating consumer client for stream thread [" + threadName + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName),
+        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.applicationId, this.clientId + "-" + threadName),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
@@ -580,9 +580,9 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
-        return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+        return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -650,10 +650,10 @@ public class StreamThread extends Thread {
     protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
+            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index aac4d85..4229f94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -57,7 +57,7 @@ public class StoreChangeLogger<K, V> {
     }
 
     protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName);
+        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;
         this.serialization = serialization;

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0276ab..83ebe48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -41,7 +41,7 @@ public class StreamsConfigTest {
 
     @Before
     public void setUp() {
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -59,9 +59,10 @@ public class StreamsConfigTest {
 
     @Test
     public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+        Map<String, Object> returnedProps =
+            streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
-        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 14cb493..1d0a969 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -183,12 +183,12 @@ public class ProcessorStateManagerTest {
     }
 
     private final Set<TopicPartition> noPartitions = Collections.emptySet();
-    private final String jobId = "test-job";
+    private final String applicationId = "test-application";
     private final String stateDir = "test";
     private final String persistentStoreName = "persistentStore";
     private final String nonPersistentStoreName = "nonPersistentStore";
-    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName);
-    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName);
+    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
+    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
 
     @Test
     public void testLockStateDirectory() throws IOException {
@@ -197,7 +197,7 @@ public class ProcessorStateManagerTest {
             FileLock lock;
 
             // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
 
             try {
                 // this should not get the lock
@@ -226,7 +226,7 @@ public class ProcessorStateManagerTest {
         try {
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
             } finally {
@@ -258,7 +258,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -311,7 +311,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -351,9 +351,9 @@ public class ProcessorStateManagerTest {
             String storeName2 = "store2";
             String storeName3 = "store3";
 
-            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
-            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
-            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3);
+            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
 
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
@@ -386,7 +386,7 @@ public class ProcessorStateManagerTest {
 
             // if there is an source partition, inherit the partition id
             Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
             try {
                 restoreConsumer.reset();
 
@@ -425,7 +425,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -462,12 +462,12 @@ public class ProcessorStateManagerTest {
             HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
             ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
             ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
-            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L);
+            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
 
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 // make sure the checkpoint file is deleted
                 assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index c8115b8..12210cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -67,7 +67,7 @@ public class ProcessorTopologyTest {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
         File localState = StateTestUtils.tempDir();
         Properties props = new Properties();
-        props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
         props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 295f0dd..21bdaff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -57,11 +57,11 @@ public class StandbyTaskTest {
 
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
 
-    private final String jobId = "test-job";
+    private final String applicationId = "test-application";
     private final String storeName1 = "store1";
     private final String storeName2 = "store2";
-    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
-    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
 
     private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
     private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
@@ -94,7 +94,7 @@ public class StandbyTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -133,7 +133,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
@@ -148,7 +148,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -167,7 +167,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -201,7 +201,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 
@@ -230,7 +230,7 @@ public class StandbyTaskTest {
             ));
 
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -299,7 +299,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 7f37bda..a5990bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -94,7 +94,7 @@ public class StreamPartitionAssignorTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1f401db..f2ade6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -78,7 +78,7 @@ public class StreamTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -105,7 +105,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -156,7 +156,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index eaaf842..b201c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -60,7 +60,7 @@ import java.util.UUID;
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
-    private final String jobId = "stream-thread-test";
+    private final String applicationId = "stream-thread-test";
     private final UUID processId = UUID.randomUUID();
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -118,7 +118,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
@@ -129,14 +129,14 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
-                              String jobId,
+                              String applicationId,
                               Collection<TopicPartition> partitions,
                               ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config) {
-            super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null);
+            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -168,11 +168,11 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
             }
         };
 
@@ -271,12 +271,12 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            File jobDir = new File(baseDir, jobId);
-            jobDir.mkdir();
-            File stateDir1 = new File(jobDir, task1.toString());
-            File stateDir2 = new File(jobDir, task2.toString());
-            File stateDir3 = new File(jobDir, task3.toString());
-            File extraDir = new File(jobDir, "X");
+            File applicationDir = new File(baseDir, applicationId);
+            applicationDir.mkdir();
+            File stateDir1 = new File(applicationDir, task1.toString());
+            File stateDir2 = new File(applicationDir, task2.toString());
+            File stateDir3 = new File(applicationDir, task3.toString());
+            File extraDir = new File(applicationDir, "X");
             stateDir1.mkdir();
             stateDir2.mkdir();
             stateDir3.mkdir();
@@ -290,7 +290,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -299,7 +299,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -412,7 +412,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -421,7 +421,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -482,7 +482,7 @@ public class StreamThreadTest {
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
-        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
+        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
 
         Map<String, PartitionAssignor.Assignment> assignments =
                 partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 6cb45f3..063eafe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -81,7 +81,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
         Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index d597fd2..b463669 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -106,8 +106,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public String jobId() {
-        return "mockJob";
+    public String applicationId() {
+        return "mockApplication";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index cf17dbe..a2948a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -126,7 +126,7 @@ public class ProcessorTopologyTestDriver {
 
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
 
-    private final String jobId = "test-driver-job";
+    private final String applicationId = "test-driver-application";
 
     private final TaskId id;
     private final ProcessorTopology topology;
@@ -167,7 +167,7 @@ public class ProcessorTopologyTestDriver {
         }
 
         task = new StreamTask(id,
-            jobId,
+            applicationId,
             partitionsByTopic.values(),
             topology,
             consumer,
@@ -334,7 +334,7 @@ public class ProcessorTopologyTestDriver {
         };
         // For each store name ...
         for (String storeName : storeNames) {
-            String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName);
+            String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...


[2/2] kafka git commit: KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id

Posted by gu...@apache.org.
KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id

guozhangwang ymatsuda : please review.

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1081 from miguno/KAFKA-3411


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/958e10c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/958e10c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/958e10c8

Branch: refs/heads/trunk
Commit: 958e10c87ce293c3bf59bb9840eaaae915eff25e
Parents: 9a836d0
Author: Michael G. Noll <mi...@confluent.io>
Authored: Thu Mar 17 10:41:48 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 17 10:41:48 2016 -0700

----------------------------------------------------------------------
 .../pageview/JsonTimestampExtractor.java        |   8 +-
 .../examples/pageview/PageViewTypedDemo.java    | 180 ++++++++++++++++++
 .../examples/pageview/PageViewTypedJob.java     | 184 -------------------
 .../examples/pageview/PageViewUntypedDemo.java  | 136 ++++++++++++++
 .../examples/pageview/PageViewUntypedJob.java   | 140 --------------
 .../kafka/streams/examples/pipe/PipeDemo.java   |  65 +++++++
 .../kafka/streams/examples/pipe/PipeJob.java    |  65 -------
 .../examples/wordcount/WordCountDemo.java       |  96 ++++++++++
 .../examples/wordcount/WordCountJob.java        |  96 ----------
 .../wordcount/WordCountProcessorDemo.java       | 137 ++++++++++++++
 .../wordcount/WordCountProcessorJob.java        | 137 --------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  12 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  14 +-
 .../streams/processor/PartitionGrouper.java     |   8 +-
 .../streams/processor/ProcessorContext.java     |   6 +-
 .../streams/processor/TopologyBuilder.java      |  42 ++---
 .../processor/internals/AbstractTask.java       |  16 +-
 .../internals/ProcessorContextImpl.java         |   9 +-
 .../internals/ProcessorStateManager.java        |  14 +-
 .../processor/internals/StandbyContextImpl.java |  14 +-
 .../processor/internals/StandbyTask.java        |  12 +-
 .../internals/StreamPartitionAssignor.java      |   4 +-
 .../streams/processor/internals/StreamTask.java |   6 +-
 .../processor/internals/StreamThread.java       |  28 +--
 .../state/internals/StoreChangeLogger.java      |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java |   7 +-
 .../internals/ProcessorStateManagerTest.java    |  28 +--
 .../internals/ProcessorTopologyTest.java        |   2 +-
 .../processor/internals/StandbyTaskTest.java    |  20 +-
 .../internals/StreamPartitionAssignorTest.java  |   2 +-
 .../processor/internals/StreamTaskTest.java     |   6 +-
 .../processor/internals/StreamThreadTest.java   |  34 ++--
 .../streams/smoketest/SmokeTestClient.java      |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   6 +-
 35 files changed, 762 insertions(+), 780 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 6443193..63e8377 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -29,12 +29,12 @@ public class JsonTimestampExtractor implements TimestampExtractor {
 
     @Override
     public long extract(ConsumerRecord<Object, Object> record) {
-        if (record.value() instanceof PageViewTypedJob.PageView) {
-            return ((PageViewTypedJob.PageView) record.value()).timestamp;
+        if (record.value() instanceof PageViewTypedDemo.PageView) {
+            return ((PageViewTypedDemo.PageView) record.value()).timestamp;
         }
 
-        if (record.value() instanceof PageViewTypedJob.UserProfile) {
-            return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+        if (record.value() instanceof PageViewTypedDemo.UserProfile) {
+            return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
         }
 
         if (record.value() instanceof JsonNode) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
new file mode 100644
index 0000000..4f9de29
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewTypedDemo {
+
+    // POJO classes
+    static public class PageView {
+        public String user;
+        public String page;
+        public Long timestamp;
+    }
+
+    static public class UserProfile {
+        public String region;
+        public Long timestamp;
+    }
+
+    static public class PageViewByRegion {
+        public String user;
+        public String page;
+        public String region;
+    }
+
+    static public class WindowedPageViewByRegion {
+        public long windowStart;
+        public String region;
+    }
+
+    static public class RegionCount {
+        public long count;
+        public String region;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+        // TODO: the following can be removed with a serialization factory
+        Map<String, Object> serdeProps = new HashMap<>();
+
+        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageView.class);
+        pageViewDeserializer.configure(serdeProps, false);
+
+        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileDeserializer.configure(serdeProps, false);
+
+        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileSerializer.configure(serdeProps, false);
+
+        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+        wPageViewByRegionSerializer.configure(serdeProps, false);
+
+        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", RegionCount.class);
+        regionCountSerializer.configure(serdeProps, false);
+
+        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
+
+        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                    @Override
+                    public PageViewByRegion apply(PageView view, UserProfile profile) {
+                        PageViewByRegion viewByRegion = new PageViewByRegion();
+                        viewByRegion.user = view.user;
+                        viewByRegion.page = view.page;
+
+                        if (profile != null) {
+                            viewByRegion.region = profile.region;
+                        } else {
+                            viewByRegion.region = "UNKNOWN";
+                        }
+                        return viewByRegion;
+                    }
+                })
+                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                    @Override
+                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                        return new KeyValue<>(viewRegion.region, viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, stringDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                    @Override
+                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                        wViewByRegion.windowStart = key.window().start();
+                        wViewByRegion.region = key.value();
+
+                        RegionCount rCount = new RegionCount();
+                        rCount.region = key.value();
+                        rCount.count = value;
+
+                        return new KeyValue<>(wViewByRegion, rCount);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
deleted file mode 100644
index 1fcb403..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewTypedJob {
-
-    // POJO classes
-    static public class PageView {
-        public String user;
-        public String page;
-        public Long timestamp;
-    }
-
-    static public class UserProfile {
-        public String region;
-        public Long timestamp;
-    }
-
-    static public class PageViewByRegion {
-        public String user;
-        public String page;
-        public String region;
-    }
-
-    static public class WindowedPageViewByRegion {
-        public long windowStart;
-        public String region;
-    }
-
-    static public class RegionCount {
-        public long count;
-        public String region;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-        // TODO: the following can be removed with a serialization factory
-        Map<String, Object> serdeProps = new HashMap<>();
-
-        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", PageView.class);
-        pageViewDeserializer.configure(serdeProps, false);
-
-        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
-        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileSerializer.configure(serdeProps, false);
-
-        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
-        wPageViewByRegionSerializer.configure(serdeProps, false);
-
-        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", RegionCount.class);
-        regionCountSerializer.configure(serdeProps, false);
-
-        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
-
-        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
-
-        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
-                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
-                    @Override
-                    public PageViewByRegion apply(PageView view, UserProfile profile) {
-                        PageViewByRegion viewByRegion = new PageViewByRegion();
-                        viewByRegion.user = view.user;
-                        viewByRegion.page = view.page;
-
-                        if (profile != null) {
-                            viewByRegion.region = profile.region;
-                        } else {
-                            viewByRegion.region = "UNKNOWN";
-                        }
-                        return viewByRegion;
-                    }
-                })
-                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
-                    @Override
-                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
-                        return new KeyValue<>(viewRegion.region, viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
-                    @Override
-                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
-                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
-                        wViewByRegion.windowStart = key.window().start();
-                        wViewByRegion.region = key.value();
-
-                        RegionCount rCount = new RegionCount();
-                        rCount.region = key.value();
-                        rCount.count = value;
-
-                        return new KeyValue<>(wViewByRegion, rCount);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
new file mode 100644
index 0000000..9377095
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+            @Override
+            public String apply(JsonNode record) {
+                return record.get("region").textValue();
+            }
+        });
+
+        KStream<JsonNode, JsonNode> regionCount = views
+                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                    @Override
+                    public JsonNode apply(JsonNode view, String region) {
+                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                        return jNode.put("user", view.get("user").textValue())
+                                .put("page", view.get("page").textValue())
+                                .put("region", region == null ? "UNKNOWN" : region);
+                    }
+                })
+                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                    @Override
+                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                    }
+                })
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, stringDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                    @Override
+                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                        keyNode.put("window-start", key.window().start())
+                                .put("region", key.value());
+
+                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                        valueNode.put("count", value);
+
+                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
deleted file mode 100644
index fb1a55d..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewUntypedJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-        final Deserializer<Long> longDeserializer = new LongDeserializer();
-        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
-        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
-
-        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
-
-        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
-
-        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
-            @Override
-            public String apply(JsonNode record) {
-                return record.get("region").textValue();
-            }
-        });
-
-        KStream<JsonNode, JsonNode> regionCount = views
-                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
-                    @Override
-                    public JsonNode apply(JsonNode view, String region) {
-                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                        return jNode.put("user", view.get("user").textValue())
-                                .put("page", view.get("page").textValue())
-                                .put("region", region == null ? "UNKNOWN" : region);
-                    }
-                })
-                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
-                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
-                    }
-                })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
-                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
-                    @Override
-                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("window-start", key.window().start())
-                                .put("region", key.value());
-
-                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        valueNode.put("count", value);
-
-                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
-                    }
-                });
-
-        // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
new file mode 100644
index 0000000..c37c68a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PipeDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream("streams-file-input").to("streams-pipe-output");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
deleted file mode 100644
index 8885ca2..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pipe;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
- * write data to a sink (output) topic.
- *
- * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
- * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PipeJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        builder.stream("streams-file-input").to("streams-pipe-output");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
new file mode 100644
index 0000000..03d5142
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountDemo {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+
+        KStream<String, String> source = builder.stream("streams-file-input");
+
+        KTable<String, Long> counts = source
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.toLowerCase().split(" "));
+                    }
+                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                    @Override
+                    public KeyValue<String, String> apply(String key, String value) {
+                        return new KeyValue<String, String>(value, value);
+                    }
+                })
+                .countByKey(stringSerializer, stringDeserializer, "Counts");
+
+        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
deleted file mode 100644
index 2b51a44..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-
-        KStream<String, String> source = builder.stream("streams-file-input");
-
-        KTable<String, Long> counts = source
-                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                    @Override
-                    public Iterable<String> apply(String value) {
-                        return Arrays.asList(value.toLowerCase().split(" "));
-                    }
-                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
-                    @Override
-                    public KeyValue<String, String> apply(String key, String value) {
-                        return new KeyValue<String, String>(value, value);
-                    }
-                })
-                .countByKey(stringSerializer, stringDeserializer, "Counts");
-
-        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
new file mode 100644
index 0000000..b651b3a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountProcessorDemo {
+
+    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+        @Override
+        public Processor<String, String> get() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                }
+
+                @Override
+                public void process(String dummy, String line) {
+                    String[] words = line.toLowerCase().split(" ");
+
+                    for (String word : words) {
+                        Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+                    System.out.println("----------- " + timestamp + " ----------- ");
+
+                    while (iter.hasNext()) {
+                        KeyValue<String, Integer> entry = iter.next();
+
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                        context.forward(entry.key, entry.value.toString());
+                    }
+
+                    iter.close();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("Source", "streams-file-input");
+
+        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        // usually the stream application would be running forever,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
deleted file mode 100644
index cb82656..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountProcessorJob {
-
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
-        @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    this.context = context;
-                    this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
-                }
-
-                @Override
-                public void process(String dummy, String line) {
-                    String[] words = line.toLowerCase().split(" ");
-
-                    for (String word : words) {
-                        Integer oldValue = this.kvStore.get(word);
-
-                        if (oldValue == null) {
-                            this.kvStore.put(word, 1);
-                        } else {
-                            this.kvStore.put(word, oldValue + 1);
-                        }
-                    }
-
-                    context.commit();
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
-                    System.out.println("----------- " + timestamp + " ----------- ");
-
-                    while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
-
-                        System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                        context.forward(entry.key, entry.value.toString());
-                    }
-
-                    iter.close();
-                }
-
-                @Override
-                public void close() {
-                    this.kvStore.close();
-                }
-            };
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
-        props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("Source", "streams-file-input");
-
-        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
-        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
-
-        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
-
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the streaming job would be ever running,
-        // in this example we just let it run for some time and stop since the input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 15d6d8b..20958e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
  * more threads specified in the configs for the processing work.
  * <p>
- * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes
+ * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
  * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
  * based on the assignment of the input topic partitions so that all partitions are being
  * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A simple example might look like this:
  * <pre>
  *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put(StreamsConfig.JOB_ID_CONFIG, "my-job");
+ *    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
  *    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  *    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  *    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -113,12 +113,12 @@ public class KafkaStreams {
 
         this.processId = UUID.randomUUID();
 
-        // JobId is a required config and hence should always have value
-        String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+        // The application ID is a required config and hence should always have value
+        String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
-            clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+            clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
 
         List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
@@ -132,7 +132,7 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+            this.threads[i] = new StreamThread(builder, config, applicationId, clientId, processId, metrics, time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c4b8ffe..52fdbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -83,13 +83,13 @@ public class StreamsConfig extends AbstractConfig {
     public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
 
-    /** <code>job.id</code> */
-    public static final String JOB_ID_CONFIG = "job.id";
-    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+    /** <code>application.id</code> */
+    public static final String APPLICATION_ID_CONFIG = "application.id";
+    public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
     /** <code>replication.factor</code> */
     public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
-    public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job.";
+    public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -124,10 +124,10 @@ public class StreamsConfig extends AbstractConfig {
     private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
 
     static {
-        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,      // required with no default value
+        CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
                                         Type.STRING,
                                         Importance.HIGH,
-                                        StreamsConfig.JOB_ID_DOC)
+                                        StreamsConfig.APPLICATION_ID_DOC)
                                 .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
                                         Type.STRING,
                                         Importance.HIGH,
@@ -297,7 +297,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private void removeStreamsSpecificConfigs(Map<String, Object> props) {
-        props.remove(StreamsConfig.JOB_ID_CONFIG);
+        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
         props.remove(StreamsConfig.STATE_DIR_CONFIG);
         props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index ae9844d..0c94084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -28,7 +28,8 @@ import java.util.Set;
  *
  * This grouper also acts as the stream task creation function along with partition distribution
  * such that each generated partition group is assigned with a distinct {@link TaskId};
- * the created task ids will then be assigned to Kafka Streams instances that host the stream job.
+ * the created task ids will then be assigned to Kafka Streams instances that host the stream
+ * processing application.
  */
 public interface PartitionGrouper {
 
@@ -37,9 +38,10 @@ public interface PartitionGrouper {
      * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this
      * interface. See {@link DefaultPartitionGrouper} for more information.
      *
-     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics
+     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics
      * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */
     Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79376ba..e9d5252 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -29,11 +29,11 @@ import java.io.File;
 public interface ProcessorContext {
 
     /**
-     * Returns the job id
+     * Returns the application id
      *
-     * @return the job id
+     * @return the application id
      */
-    String jobId();
+    String applicationId();
 
     /**
      * Returns the task id

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 6e5aec5..ab7122b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -85,7 +85,7 @@ public class TopologyBuilder {
             this.name = name;
         }
 
-        public abstract ProcessorNode build(String jobId);
+        public abstract ProcessorNode build(String applicationId);
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +105,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
@@ -124,7 +124,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             return new SourceNode(name, keyDeserializer, valDeserializer);
         }
     }
@@ -147,10 +147,10 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String jobId) {
+        public ProcessorNode build(String applicationId) {
             if (internalTopicNames.contains(topic)) {
-                // prefix the job id to the internal topic name
-                return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner);
+                // prefix the internal topic name with the application id
+                return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner);
             } else {
                 return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
             }
@@ -496,7 +496,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups(String jobId) {
+    public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
@@ -514,8 +514,8 @@ public class TopologyBuilder {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
                         if (this.internalTopicNames.contains(topic)) {
-                            // prefix the job id to the internal topic name
-                            String internalTopic = jobId + "-" + topic;
+                            // prefix the internal topic name with the application id
+                            String internalTopic = applicationId + "-" + topic;
                             internalSourceTopics.add(internalTopic);
                             sourceTopics.add(internalTopic);
                         } else {
@@ -528,8 +528,8 @@ public class TopologyBuilder {
                 String topic = nodeToSinkTopic.get(node);
                 if (topic != null) {
                     if (internalTopicNames.contains(topic)) {
-                        // prefix the job id to the change log topic name
-                        sinkTopics.add(jobId + "-" + topic);
+                        // prefix the change log topic name with the application id
+                        sinkTopics.add(applicationId + "-" + topic);
                     } else {
                         sinkTopics.add(topic);
                     }
@@ -538,8 +538,8 @@ public class TopologyBuilder {
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
                     if (stateFactory.isInternal && stateFactory.users.contains(node)) {
-                        // prefix the job id to the change log topic name
-                        stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+                        // prefix the change log topic name with the application id
+                        stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
                     }
                 }
             }
@@ -637,7 +637,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public ProcessorTopology build(String jobId, Integer topicGroupId) {
+    public ProcessorTopology build(String applicationId, Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -645,11 +645,11 @@ public class TopologyBuilder {
             // when nodeGroup is null, we build the full topology. this is used in some tests.
             nodeGroup = null;
         }
-        return build(jobId, nodeGroup);
+        return build(applicationId, nodeGroup);
     }
 
     @SuppressWarnings("unchecked")
-    private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
+    private ProcessorTopology build(String applicationId, Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -658,7 +658,7 @@ public class TopologyBuilder {
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build(jobId);
+                ProcessorNode node = factory.build(applicationId);
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -674,8 +674,8 @@ public class TopologyBuilder {
                 } else if (factory instanceof SourceNodeFactory) {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
                         if (internalTopicNames.contains(topic)) {
-                            // prefix the job id to the internal topic name
-                            topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
+                            // prefix the internal topic name with the application id
+                            topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
                         } else {
                             topicSourceMap.put(topic, (SourceNode) node);
                         }
@@ -697,11 +697,11 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
-    public Set<String> sourceTopics(String jobId) {
+    public Set<String> sourceTopics(String applicationId) {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
-                topics.add(jobId + "-" + topic);
+                topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
             }