You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/26 18:19:54 UTC

kafka git commit: KAFKA-3125: Add Kafka Streams Exceptions

Repository: kafka
Updated Branches:
  refs/heads/trunk 942074b77 -> 5ae97196a


KAFKA-3125: Add Kafka Streams Exceptions

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #809 from guozhangwang/K3125


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ae97196
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ae97196
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ae97196

Branch: refs/heads/trunk
Commit: 5ae97196ae149e58f6cfa3c5b6d968cbd7cb6787
Parents: 942074b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Jan 26 09:19:28 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jan 26 09:19:28 2016 -0800

----------------------------------------------------------------------
 .../streams/errors/ProcessorStateException.java |  35 ++++++
 .../kafka/streams/errors/StreamsException.java  |  40 ++++++
 .../streams/errors/TaskAssignmentException.java |  37 ++++++
 .../streams/errors/TaskIdFormatException.java   |  27 ++++
 .../errors/TopologyBuilderException.java        |  27 ++++
 .../kstream/internals/AbstractStream.java       |   4 +-
 .../kstream/internals/KStreamJoinWindow.java    |  14 +--
 .../kstream/internals/KTableRepartitionMap.java |   3 +-
 .../processor/DefaultPartitionGrouper.java      |   4 +-
 .../apache/kafka/streams/processor/TaskId.java  |   9 +-
 .../streams/processor/TopologyBuilder.java      |  78 ++++++------
 .../streams/processor/TopologyException.java    |  38 ------
 .../processor/internals/AbstractTask.java       |   6 +-
 .../processor/internals/PartitionGroup.java     |   3 +-
 .../internals/ProcessorContextImpl.java         |  16 +--
 .../internals/ProcessorStateManager.java        |   4 +-
 .../processor/internals/StandbyContextImpl.java |   3 +-
 .../internals/StreamPartitionAssignor.java      |   9 +-
 .../processor/internals/StreamThread.java       | 125 ++++++++++---------
 .../internals/assignment/AssignmentInfo.java    |  12 +-
 .../internals/assignment/SubscriptionInfo.java  |   1 +
 .../assignment/TaskAssignmentException.java     |  32 -----
 .../internals/assignment/TaskAssignor.java      |   1 +
 .../streams/state/internals/RocksDBStore.java   |  12 +-
 .../streams/kstream/KStreamBuilderTest.java     |   4 +-
 .../streams/processor/TopologyBuilderTest.java  |  25 ++--
 .../apache/kafka/streams/state/StateUtils.java  |   2 +-
 27 files changed, 333 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
new file mode 100644
index 0000000..6434d04
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class ProcessorStateException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ProcessorStateException(String s) {
+        super(s);
+    }
+
+    public ProcessorStateException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public ProcessorStateException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
new file mode 100644
index 0000000..6247886
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * StreamsException is the top-level exception type generated by Kafka Streams.
+ */
+@InterfaceStability.Unstable
+public class StreamsException extends KafkaException {
+
+    public StreamsException(String s) {
+        super(s);
+    }
+
+    public StreamsException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public StreamsException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
new file mode 100644
index 0000000..3ae8503
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+/**
+ * The run time exception class for stream task assignments
+ */
+public class TaskAssignmentException extends StreamsException {
+
+    private final static long serialVersionUID = 1L;
+
+    public TaskAssignmentException(String s) {
+        super(s);
+    }
+
+    public TaskAssignmentException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public TaskAssignmentException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
new file mode 100644
index 0000000..bf0ebf5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class TaskIdFormatException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TaskIdFormatException(String taskString) {
+        super("Task id cannot be parsed correctly" + (taskString == null ? "" : " from " + taskString));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
new file mode 100644
index 0000000..9dd740b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class TopologyBuilderException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopologyBuilderException(String message) {
+        super("Invalid topology building" + (message == null ? "" : ": " + message));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index fa34ba1..c537465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
@@ -41,7 +41,7 @@ public abstract class AbstractStream<K> {
         Set<String> otherSourceNodes = other.sourceNodes;
 
         if (thisSourceNodes == null || otherSourceNodes == null)
-            throw new KafkaException("not joinable");
+            throw new TopologyBuilderException(this.name + " and " + other.name + " are not joinable");
 
         Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(thisSourceNodes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 4f427d4..5b83b28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,14 +27,13 @@ import org.apache.kafka.streams.state.WindowStore;
 class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
 
     private final String windowName;
-    private final long windowSizeMs;
-    private final long retentionPeriodMs;
-
 
     KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
         this.windowName = windowName;
-        this.windowSizeMs = windowSizeMs;
-        this.retentionPeriodMs = retentionPeriodMs;
+
+        if (windowSizeMs * 2 > retentionPeriodMs)
+            throw new TopologyBuilderException("The retention period of the join window "
+                    + windowName + " must at least two times its window size.");
     }
 
     @Override
@@ -52,9 +51,6 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
             super.init(context);
 
             window = (WindowStore<K, V>) context.getStateStore(windowName);
-
-            if (windowSizeMs * 2 > retentionPeriodMs)
-                throw new KafkaException("The retention period must be at least two times the join window size.");
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 499f721..ff69c37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -60,7 +59,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
     @Override
     public void enableSendingOldValues() {
         // this should never be called
-        throw new KafkaException("KTableRepartitionMap should always require sending old values.");
+        throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
     }
 
     private KeyValue<K1, V1> computeValue(K key, V value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 923a217..47c5e58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -18,9 +18,9 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -61,7 +61,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
             List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
 
             if (infos == null)
-                throw new KafkaException("topic not found :" + topic);
+                throw new StreamsException("Topic not found during partition assignment: " + topic);
 
             int numPartitions = infos.size();
             if (numPartitions > maxNumPartitions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 023bbbb..6e7150e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -38,7 +40,7 @@ public class TaskId implements Comparable<TaskId> {
 
     public static TaskId parse(String string) {
         int index = string.indexOf('_');
-        if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
+        if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(string);
 
         try {
             int topicGroupId = Integer.parseInt(string.substring(0, index));
@@ -46,7 +48,7 @@ public class TaskId implements Comparable<TaskId> {
 
             return new TaskId(topicGroupId, partition);
         } catch (Exception e) {
-            throw new TaskIdFormatException();
+            throw new TaskIdFormatException(string);
         }
     }
 
@@ -93,7 +95,4 @@ public class TaskId implements Comparable<TaskId> {
                         (this.partition > other.partition ? 1 :
                             0)));
     }
-
-    public static class TaskIdFormatException extends RuntimeException {
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a6b54b7..7af377f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -17,10 +17,10 @@
 
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -218,11 +218,11 @@ public class TopologyBuilder {
      */
     public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
         if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
 
         for (String topic : topics) {
             if (sourceTopicNames.contains(topic))
-                throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+                throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
 
             sourceTopicNames.add(topic);
         }
@@ -331,15 +331,15 @@ public class TopologyBuilder {
      */
     public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
             for (String parent : parentNames) {
                 if (parent.equals(name)) {
-                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
                 }
                 if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
+                    throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
@@ -361,15 +361,15 @@ public class TopologyBuilder {
      */
     public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
         if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
 
         if (parentNames != null) {
             for (String parent : parentNames) {
                 if (parent.equals(name)) {
-                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
                 }
                 if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
+                    throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
                 }
             }
         }
@@ -388,7 +388,7 @@ public class TopologyBuilder {
      */
     public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
         if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
         }
 
         stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier));
@@ -443,9 +443,9 @@ public class TopologyBuilder {
 
     private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
         if (!stateFactories.containsKey(stateStoreName))
-            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
         if (!nodeFactories.containsKey(processorName))
-            throw new TopologyException("Processor " + processorName + " is not added yet.");
+            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
 
         StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
         Iterator<String> iter = stateStoreFactory.users.iterator();
@@ -459,7 +459,7 @@ public class TopologyBuilder {
         if (nodeFactory instanceof ProcessorNodeFactory) {
             ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
         } else {
-            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
     }
 
@@ -611,38 +611,34 @@ public class TopologyBuilder {
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
         Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
 
-        try {
-            // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
-            for (NodeFactory factory : nodeFactories.values()) {
-                if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                    ProcessorNode node = factory.build();
-                    processorNodes.add(node);
-                    processorMap.put(node.name(), node);
-
-                    if (factory instanceof ProcessorNodeFactory) {
-                        for (String parent : ((ProcessorNodeFactory) factory).parents) {
-                            processorMap.get(parent).addChild(node);
-                        }
-                        for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
-                            if (!stateStoreMap.containsKey(stateStoreName)) {
-                                stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
-                            }
-                        }
-                    } else if (factory instanceof SourceNodeFactory) {
-                        for (String topic : ((SourceNodeFactory) factory).topics) {
-                            topicSourceMap.put(topic, (SourceNode) node);
-                        }
-                    } else if (factory instanceof SinkNodeFactory) {
-                        for (String parent : ((SinkNodeFactory) factory).parents) {
-                            processorMap.get(parent).addChild(node);
+        // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+        for (NodeFactory factory : nodeFactories.values()) {
+            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+                ProcessorNode node = factory.build();
+                processorNodes.add(node);
+                processorMap.put(node.name(), node);
+
+                if (factory instanceof ProcessorNodeFactory) {
+                    for (String parent : ((ProcessorNodeFactory) factory).parents) {
+                        processorMap.get(parent).addChild(node);
+                    }
+                    for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+                        if (!stateStoreMap.containsKey(stateStoreName)) {
+                            stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
                         }
-                    } else {
-                        throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                     }
+                } else if (factory instanceof SourceNodeFactory) {
+                    for (String topic : ((SourceNodeFactory) factory).topics) {
+                        topicSourceMap.put(topic, (SourceNode) node);
+                    }
+                } else if (factory instanceof SinkNodeFactory) {
+                    for (String parent : ((SinkNodeFactory) factory).parents) {
+                        processorMap.get(parent).addChild(node);
+                    }
+                } else {
+                    throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
                 }
             }
-        } catch (Exception e) {
-            throw new KafkaException("ProcessorNode construction failed: this should not happen.");
         }
 
         return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
deleted file mode 100644
index 99d1405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-
-public class TopologyException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public TopologyException(String message) {
-        super(message);
-    }
-
-    public TopologyException(String name, Object value) {
-        this(name, value, null);
-    }
-
-    public TopologyException(String name, Object value, String message) {
-        super("Invalid topology building" + (message == null ? "" : ": " + message));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 68680ab..46dd738 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -18,9 +18,9 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -62,7 +62,7 @@ public abstract class AbstractTask {
             // if partitions is null, this is a standby task
             this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
-            throw new KafkaException("Error while creating the state manager", e);
+            throw new ProcessorStateException("Error while creating the state manager", e);
         }
     }
 
@@ -95,7 +95,7 @@ public abstract class AbstractTask {
         try {
             stateMgr.close(recordCollectorOffsets());
         } catch (IOException e) {
-            throw new KafkaException("Error while closing the state manager in processor context", e);
+            throw new ProcessorStateException("Error while closing the state manager", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index d888085..b487ff5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
@@ -149,7 +148,7 @@ public class PartitionGroup {
         RecordQueue recordQueue = partitionQueues.get(partition);
 
         if (recordQueue == null)
-            throw new KafkaException("Record's partition does not belong to this partition-group.");
+            throw new IllegalStateException("Record's partition does not belong to this partition-group.");
 
         return recordQueue.size();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 102c534..7931a6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,11 +17,11 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -120,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
-            throw new KafkaException("Can only create state stores during initialization.");
+            throw new IllegalStateException("Can only create state stores during initialization.");
 
         stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }
@@ -130,10 +130,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         ProcessorNode node = task.node();
 
         if (node == null)
-            throw new KafkaException("accessing from an unknown node");
+            throw new TopologyBuilderException("Accessing from an unknown node");
 
         if (!node.stateStores.contains(name))
-            throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+            throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
 
         return stateMgr.getStore(name);
     }
@@ -141,7 +141,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @Override
     public String topic() {
         if (task.record() == null)
-            throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
+            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
 
         return task.record().topic();
     }
@@ -149,7 +149,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @Override
     public int partition() {
         if (task.record() == null)
-            throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
+            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
 
         return task.record().partition();
     }
@@ -157,7 +157,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @Override
     public long offset() {
         if (this.task.record() == null)
-            throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
+            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
 
         return this.task.record().offset();
     }
@@ -165,7 +165,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @Override
     public long timestamp() {
         if (task.record() == null)
-            throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
+            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
 
         return task.record().timestamp;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 547bb15..bc7f4b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -160,7 +160,7 @@ public class ProcessorStateManager {
         } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
 
         if (partitionNotFound)
-            throw new KafkaException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
+            throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
 
         this.stores.put(store.name(), store);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 89d185c..133d597 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
@@ -112,7 +111,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
-            throw new KafkaException("Can only create state stores during initialization.");
+            throw new IllegalStateException("Can only create state stores during initialization.");
 
         stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 5d87e5a..d499534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,12 +24,13 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
             return partitions;
         } catch (IOException e) {
-            throw new KafkaException(e);
+            throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
         }
     }
 
@@ -158,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
             zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
         } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
+            throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
         }
     }
 
@@ -193,7 +194,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
             zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
         } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
+            throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f118f60..eccd02c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -41,6 +41,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -127,6 +130,8 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             commitAll();
+            // TODO: right now upon partition revocation, we always remove all the tasks;
+            // this behavior can be optimized to only remove affected tasks in the future
             removeStreamTasks();
             removeStandbyTasks();
             lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
@@ -229,8 +234,13 @@ public class StreamThread extends Thread {
 
         try {
             runLoop();
-        } catch (RuntimeException e) {
-            log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
+        } catch (KafkaException e) {
+            // just re-throw the exception as it should be logged already
+            throw e;
+        } catch (Exception e) {
+            // we have caught all Kafka related exceptions, and other runtime exceptions
+            // should be due to user application errors
+            log.error("Streams application error during processing in thread [" + this.getName() + "]: ", e);
             throw e;
         } finally {
             shutdown();
@@ -251,13 +261,21 @@ public class StreamThread extends Thread {
     private void shutdown() {
         log.info("Shutting down stream thread [" + this.getName() + "]");
 
-        // Exceptions should not prevent this call from going through all shutdown steps.
+        // We need to first remove the tasks before shutting down the underlying clients
+        // as they may be required in the previous steps; and exceptions should not
+        // prevent this call from going through all shutdown steps.
         try {
             commitAll();
         } catch (Throwable e) {
             // already logged in commitAll()
         }
         try {
+            removeStreamTasks();
+            removeStandbyTasks();
+        } catch (Throwable e) {
+            // already logged in removeStreamTasks() and removeStandbyTasks()
+        }
+        try {
             producer.close();
         } catch (Throwable e) {
             log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
@@ -272,70 +290,60 @@ public class StreamThread extends Thread {
         } catch (Throwable e) {
             log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
         }
-        try {
-            removeStreamTasks();
-            removeStandbyTasks();
-        } catch (Throwable e) {
-            // already logged in removeStreamTasks() and removeStandbyTasks()
-        }
 
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
     }
 
     private void runLoop() {
-        try {
-            int totalNumBuffered = 0;
-            boolean requiresPoll = true;
+        int totalNumBuffered = 0;
+        boolean requiresPoll = true;
 
-            ensureCopartitioning(builder.copartitionGroups());
+        ensureCopartitioning(builder.copartitionGroups());
 
-            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+        consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
 
-            while (stillRunning()) {
-                // try to fetch some records if necessary
-                if (requiresPoll) {
-                    requiresPoll = false;
+        while (stillRunning()) {
+            // try to fetch some records if necessary
+            if (requiresPoll) {
+                requiresPoll = false;
 
-                    long startPoll = time.milliseconds();
+                long startPoll = time.milliseconds();
 
-                    ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
 
-                    if (!records.isEmpty()) {
-                        for (TopicPartition partition : records.partitions()) {
-                            StreamTask task = activeTasksByPartition.get(partition);
-                            task.addRecords(partition, records.records(partition));
-                        }
+                if (!records.isEmpty()) {
+                    for (TopicPartition partition : records.partitions()) {
+                        StreamTask task = activeTasksByPartition.get(partition);
+                        task.addRecords(partition, records.records(partition));
                     }
-
-                    long endPoll = time.milliseconds();
-                    sensors.pollTimeSensor.record(endPoll - startPoll);
                 }
 
-                totalNumBuffered = 0;
+                long endPoll = time.milliseconds();
+                sensors.pollTimeSensor.record(endPoll - startPoll);
+            }
 
-                if (!activeTasks.isEmpty()) {
-                    // try to process one record from each task
-                    for (StreamTask task : activeTasks.values()) {
-                        long startProcess = time.milliseconds();
+            totalNumBuffered = 0;
 
-                        totalNumBuffered += task.process();
-                        requiresPoll = requiresPoll || task.requiresPoll();
+            if (!activeTasks.isEmpty()) {
+                // try to process one record from each task
+                for (StreamTask task : activeTasks.values()) {
+                    long startProcess = time.milliseconds();
 
-                        sensors.processTimeSensor.record(time.milliseconds() - startProcess);
-                    }
+                    totalNumBuffered += task.process();
+                    requiresPoll = requiresPoll || task.requiresPoll();
 
-                    maybePunctuate();
-                } else {
-                    // even when no task is assigned, we must poll to get a task.
-                    requiresPoll = true;
+                    sensors.processTimeSensor.record(time.milliseconds() - startProcess);
                 }
-                maybeCommit();
-                maybeUpdateStandbyTasks();
 
-                maybeClean();
+                maybePunctuate();
+            } else {
+                // even when no task is assigned, we must poll to get a task.
+                requiresPoll = true;
             }
-        } catch (Exception e) {
-            throw new KafkaException(e);
+            maybeCommit();
+            maybeUpdateStandbyTasks();
+
+            maybeClean();
         }
     }
 
@@ -396,8 +404,8 @@ public class StreamThread extends Thread {
                 if (task.maybePunctuate(now))
                     sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
-            } catch (Exception e) {
-                log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            } catch (KafkaException e) {
+                log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
                 throw e;
             }
         }
@@ -418,7 +426,7 @@ public class StreamThread extends Thread {
                 try {
                     if (task.commitNeeded())
                         commitOne(task, time.milliseconds());
-                } catch (Exception e) {
+                } catch (KafkaException e) {
                     log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
                     throw e;
                 }
@@ -444,7 +452,7 @@ public class StreamThread extends Thread {
     private void commitOne(AbstractTask task, long now) {
         try {
             task.commit();
-        } catch (Exception e) {
+        } catch (KafkaException e) {
             log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
             throw e;
         }
@@ -485,7 +493,7 @@ public class StreamThread extends Thread {
                                 }
                             }
                         }
-                    } catch (TaskId.TaskIdFormatException e) {
+                    } catch (TaskIdFormatException e) {
                         // there may be some unknown files that sits in the same directory,
                         // we should ignore these files instead trying to delete them as well
                     }
@@ -523,7 +531,7 @@ public class StreamThread extends Thread {
                     if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists())
                         tasks.add(id);
 
-                } catch (TaskId.TaskIdFormatException e) {
+                } catch (TaskIdFormatException e) {
                     // there may be some unknown files that sits in the same directory,
                     // we should ignore these files instead trying to delete them as well
                 }
@@ -543,7 +551,7 @@ public class StreamThread extends Thread {
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
         if (partitionAssignor == null)
-            throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
+            throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
 
         HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
 
@@ -570,7 +578,7 @@ public class StreamThread extends Thread {
 
                 for (TopicPartition partition : partitions)
                     activeTasksByPartition.put(partition, task);
-            } catch (Exception e) {
+            } catch (StreamsException e) {
                 log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e);
                 throw e;
             }
@@ -578,7 +586,6 @@ public class StreamThread extends Thread {
     }
 
     private void removeStreamTasks() {
-        // TODO: change this clearing tasks behavior
         for (StreamTask task : activeTasks.values()) {
             closeOne(task);
         }
@@ -594,7 +601,7 @@ public class StreamThread extends Thread {
         log.info("Removing a task {}", task.id());
         try {
             task.close();
-        } catch (Exception e) {
+        } catch (StreamsException e) {
             log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
             throw e;
         }
@@ -615,7 +622,7 @@ public class StreamThread extends Thread {
 
     private void addStandbyTasks() {
         if (partitionAssignor == null)
-            throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
+            throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
 
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
@@ -673,14 +680,14 @@ public class StreamThread extends Thread {
             List<PartitionInfo> infos = consumer.partitionsFor(topic);
 
             if (infos == null)
-                throw new KafkaException("topic not found: " + topic);
+                throw new TopologyBuilderException("Topic not found: " + topic);
 
             if (numPartitions == -1) {
                 numPartitions = infos.size();
             } else if (numPartitions != infos.size()) {
                 String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                 Arrays.sort(topics);
-                throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
+                throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 2bd4457..c2175bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.ByteBufferInputStream;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,12 +87,12 @@ public class AssignmentInfo {
                 return ByteBuffer.wrap(baos.toByteArray());
 
             } else {
-                TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+                TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
                 log.error(ex.getMessage(), ex);
                 throw ex;
             }
         } catch (IOException ex) {
-            throw new KafkaException("failed to encode AssignmentInfo", ex);
+            throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
         }
     }
 
@@ -128,12 +128,12 @@ public class AssignmentInfo {
                 return new AssignmentInfo(activeTasks, standbyTasks);
 
             } else {
-                TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+                TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
                 log.error(ex.getMessage(), ex);
                 throw ex;
             }
         } catch (IOException ex) {
-            throw new KafkaException("failed to decode AssignmentInfo", ex);
+            throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 43009a1..ccd2f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
deleted file mode 100644
index 839a6c2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.common.KafkaException;
-
-/**
- * The run time exception class for stream task assignments
- */
-public class TaskAssignmentException extends KafkaException {
-
-    private final static long serialVersionUID = 1L;
-
-    public TaskAssignmentException(String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index d1e0782..2501677 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6c77ab2..dea7e0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -102,13 +102,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 dir.getParentFile().mkdirs();
                 return RocksDB.open(options, dir.toString());
             } else {
-                throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
+                throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based.");
                 // TODO: support TTL with change log?
                 // return TtlDB.open(options, dir.toString(), ttl, false);
             }
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
+            throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
         }
     }
 
@@ -128,7 +128,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
+            throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e);
         }
     }
 
@@ -142,7 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             }
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
+            throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e);
         }
     }
 
@@ -177,7 +177,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             db.flush(fOptions);
         } catch (RocksDBException e) {
             // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing flush from store " + this.name, e);
+            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index f79063f..e75b595 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.processor.TopologyException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamBuilderTest {
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testFrom() {
         final KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a2f6ec0..a93b8ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -40,7 +41,7 @@ import static org.junit.Assert.assertTrue;
 
 public class TopologyBuilderTest {
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddSourceWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -48,7 +49,7 @@ public class TopologyBuilderTest {
         builder.addSource("source", "topic-2");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddSourceWithSameTopic() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -56,7 +57,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-2", "topic-1");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddProcessorWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -65,21 +66,21 @@ public class TopologyBuilderTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddProcessorWithWrongParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddProcessorWithSelfParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddSinkWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -88,14 +89,14 @@ public class TopologyBuilderTest {
         builder.addSink("sink", "topic-3", "source");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddSinkWithWrongParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addSink("sink", "topic-2", "source");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddSinkWithSelfParent() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -145,14 +146,14 @@ public class TopologyBuilderTest {
         assertEquals(3, builder.sourceTopics().size());
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
         final TopologyBuilder builder = new TopologyBuilder();
 
         builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddStateStoreWithSource() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -160,7 +161,7 @@ public class TopologyBuilderTest {
         builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddStateStoreWithSink() {
         final TopologyBuilder builder = new TopologyBuilder();
 
@@ -168,7 +169,7 @@ public class TopologyBuilderTest {
         builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = TopologyBuilderException.class)
     public void testAddStateStoreWithDuplicates() {
         final TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
index f342dcd..c014ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
@@ -50,7 +50,7 @@ public class StateUtils {
             });
             return dir;
         } catch (IOException ex) {
-            throw new RuntimeException("failed to create a temp dir", ex);
+            throw new RuntimeException("Failed to create a temp dir", ex);
         }
     }