You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/26 18:19:54 UTC
kafka git commit: KAFKA-3125: Add Kafka Streams Exceptions
Repository: kafka
Updated Branches:
refs/heads/trunk 942074b77 -> 5ae97196a
KAFKA-3125: Add Kafka Streams Exceptions
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #809 from guozhangwang/K3125
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ae97196
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ae97196
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ae97196
Branch: refs/heads/trunk
Commit: 5ae97196ae149e58f6cfa3c5b6d968cbd7cb6787
Parents: 942074b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Jan 26 09:19:28 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jan 26 09:19:28 2016 -0800
----------------------------------------------------------------------
.../streams/errors/ProcessorStateException.java | 35 ++++++
.../kafka/streams/errors/StreamsException.java | 40 ++++++
.../streams/errors/TaskAssignmentException.java | 37 ++++++
.../streams/errors/TaskIdFormatException.java | 27 ++++
.../errors/TopologyBuilderException.java | 27 ++++
.../kstream/internals/AbstractStream.java | 4 +-
.../kstream/internals/KStreamJoinWindow.java | 14 +--
.../kstream/internals/KTableRepartitionMap.java | 3 +-
.../processor/DefaultPartitionGrouper.java | 4 +-
.../apache/kafka/streams/processor/TaskId.java | 9 +-
.../streams/processor/TopologyBuilder.java | 78 ++++++------
.../streams/processor/TopologyException.java | 38 ------
.../processor/internals/AbstractTask.java | 6 +-
.../processor/internals/PartitionGroup.java | 3 +-
.../internals/ProcessorContextImpl.java | 16 +--
.../internals/ProcessorStateManager.java | 4 +-
.../processor/internals/StandbyContextImpl.java | 3 +-
.../internals/StreamPartitionAssignor.java | 9 +-
.../processor/internals/StreamThread.java | 125 ++++++++++---------
.../internals/assignment/AssignmentInfo.java | 12 +-
.../internals/assignment/SubscriptionInfo.java | 1 +
.../assignment/TaskAssignmentException.java | 32 -----
.../internals/assignment/TaskAssignor.java | 1 +
.../streams/state/internals/RocksDBStore.java | 12 +-
.../streams/kstream/KStreamBuilderTest.java | 4 +-
.../streams/processor/TopologyBuilderTest.java | 25 ++--
.../apache/kafka/streams/state/StateUtils.java | 2 +-
27 files changed, 333 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
new file mode 100644
index 0000000..6434d04
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class ProcessorStateException extends StreamsException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ProcessorStateException(String s) {
+ super(s);
+ }
+
+ public ProcessorStateException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public ProcessorStateException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
new file mode 100644
index 0000000..6247886
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * StreamsException is the top-level exception type generated by Kafka Streams.
+ */
+@InterfaceStability.Unstable
+public class StreamsException extends KafkaException {
+
+ public StreamsException(String s) {
+ super(s);
+ }
+
+ public StreamsException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public StreamsException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
new file mode 100644
index 0000000..3ae8503
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+/**
+ * The run time exception class for stream task assignments
+ */
+public class TaskAssignmentException extends StreamsException {
+
+ private final static long serialVersionUID = 1L;
+
+ public TaskAssignmentException(String s) {
+ super(s);
+ }
+
+ public TaskAssignmentException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public TaskAssignmentException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
new file mode 100644
index 0000000..bf0ebf5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class TaskIdFormatException extends StreamsException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TaskIdFormatException(String taskString) {
+ super("Task id cannot be parsed correctly" + (taskString == null ? "" : " from " + taskString));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
new file mode 100644
index 0000000..9dd740b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+public class TopologyBuilderException extends StreamsException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TopologyBuilderException(String message) {
+ super("Invalid topology building" + (message == null ? "" : ": " + message));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index fa34ba1..c537465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -41,7 +41,7 @@ public abstract class AbstractStream<K> {
Set<String> otherSourceNodes = other.sourceNodes;
if (thisSourceNodes == null || otherSourceNodes == null)
- throw new KafkaException("not joinable");
+ throw new TopologyBuilderException(this.name + " and " + other.name + " are not joinable");
Set<String> allSourceNodes = new HashSet<>();
allSourceNodes.addAll(thisSourceNodes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 4f427d4..5b83b28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,14 +27,13 @@ import org.apache.kafka.streams.state.WindowStore;
class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
private final String windowName;
- private final long windowSizeMs;
- private final long retentionPeriodMs;
-
KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
this.windowName = windowName;
- this.windowSizeMs = windowSizeMs;
- this.retentionPeriodMs = retentionPeriodMs;
+
+ if (windowSizeMs * 2 > retentionPeriodMs)
+ throw new TopologyBuilderException("The retention period of the join window "
+ + windowName + " must at least two times its window size.");
}
@Override
@@ -52,9 +51,6 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
super.init(context);
window = (WindowStore<K, V>) context.getStateStore(windowName);
-
- if (windowSizeMs * 2 > retentionPeriodMs)
- throw new KafkaException("The retention period must be at least two times the join window size.");
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 499f721..ff69c37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -60,7 +59,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
@Override
public void enableSendingOldValues() {
// this should never be called
- throw new KafkaException("KTableRepartitionMap should always require sending old values.");
+ throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
}
private KeyValue<K1, V1> computeValue(K key, V value) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 923a217..47c5e58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -18,9 +18,9 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
import java.util.Collections;
import java.util.HashMap;
@@ -61,7 +61,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
if (infos == null)
- throw new KafkaException("topic not found :" + topic);
+ throw new StreamsException("Topic not found during partition assignment: " + topic);
int numPartitions = infos.size();
if (numPartitions > maxNumPartitions)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 023bbbb..6e7150e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -38,7 +40,7 @@ public class TaskId implements Comparable<TaskId> {
public static TaskId parse(String string) {
int index = string.indexOf('_');
- if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
+ if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(string);
try {
int topicGroupId = Integer.parseInt(string.substring(0, index));
@@ -46,7 +48,7 @@ public class TaskId implements Comparable<TaskId> {
return new TaskId(topicGroupId, partition);
} catch (Exception e) {
- throw new TaskIdFormatException();
+ throw new TaskIdFormatException(string);
}
}
@@ -93,7 +95,4 @@ public class TaskId implements Comparable<TaskId> {
(this.partition > other.partition ? 1 :
0)));
}
-
- public static class TaskIdFormatException extends RuntimeException {
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a6b54b7..7af377f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -17,10 +17,10 @@
package org.apache.kafka.streams.processor;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -218,11 +218,11 @@ public class TopologyBuilder {
*/
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
for (String topic : topics) {
if (sourceTopicNames.contains(topic))
- throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+ throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
sourceTopicNames.add(topic);
}
@@ -331,15 +331,15 @@ public class TopologyBuilder {
*/
public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
if (parentNames != null) {
for (String parent : parentNames) {
if (parent.equals(name)) {
- throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+ throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
}
if (!nodeFactories.containsKey(parent)) {
- throw new TopologyException("Parent processor " + parent + " is not added yet.");
+ throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
}
}
}
@@ -361,15 +361,15 @@ public class TopologyBuilder {
*/
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
if (parentNames != null) {
for (String parent : parentNames) {
if (parent.equals(name)) {
- throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
+ throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
}
if (!nodeFactories.containsKey(parent)) {
- throw new TopologyException("Parent processor " + parent + " is not added yet.");
+ throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
}
}
}
@@ -388,7 +388,7 @@ public class TopologyBuilder {
*/
public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
if (stateFactories.containsKey(supplier.name())) {
- throw new TopologyException("StateStore " + supplier.name() + " is already added.");
+ throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
}
stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier));
@@ -443,9 +443,9 @@ public class TopologyBuilder {
private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName))
- throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
+ throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
if (!nodeFactories.containsKey(processorName))
- throw new TopologyException("Processor " + processorName + " is not added yet.");
+ throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
Iterator<String> iter = stateStoreFactory.users.iterator();
@@ -459,7 +459,7 @@ public class TopologyBuilder {
if (nodeFactory instanceof ProcessorNodeFactory) {
((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
} else {
- throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
}
@@ -611,38 +611,34 @@ public class TopologyBuilder {
Map<String, SourceNode> topicSourceMap = new HashMap<>();
Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
- try {
- // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
- for (NodeFactory factory : nodeFactories.values()) {
- if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- ProcessorNode node = factory.build();
- processorNodes.add(node);
- processorMap.put(node.name(), node);
-
- if (factory instanceof ProcessorNodeFactory) {
- for (String parent : ((ProcessorNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
- }
- for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
- if (!stateStoreMap.containsKey(stateStoreName)) {
- stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
- }
- }
- } else if (factory instanceof SourceNodeFactory) {
- for (String topic : ((SourceNodeFactory) factory).topics) {
- topicSourceMap.put(topic, (SourceNode) node);
- }
- } else if (factory instanceof SinkNodeFactory) {
- for (String parent : ((SinkNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
+ // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+ for (NodeFactory factory : nodeFactories.values()) {
+ if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+ ProcessorNode node = factory.build();
+ processorNodes.add(node);
+ processorMap.put(node.name(), node);
+
+ if (factory instanceof ProcessorNodeFactory) {
+ for (String parent : ((ProcessorNodeFactory) factory).parents) {
+ processorMap.get(parent).addChild(node);
+ }
+ for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+ if (!stateStoreMap.containsKey(stateStoreName)) {
+ stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
}
- } else {
- throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
+ } else if (factory instanceof SourceNodeFactory) {
+ for (String topic : ((SourceNodeFactory) factory).topics) {
+ topicSourceMap.put(topic, (SourceNode) node);
+ }
+ } else if (factory instanceof SinkNodeFactory) {
+ for (String parent : ((SinkNodeFactory) factory).parents) {
+ processorMap.get(parent).addChild(node);
+ }
+ } else {
+ throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
}
}
- } catch (Exception e) {
- throw new KafkaException("ProcessorNode construction failed: this should not happen.");
}
return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
deleted file mode 100644
index 99d1405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-
-public class TopologyException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public TopologyException(String message) {
- super(message);
- }
-
- public TopologyException(String name, Object value) {
- this(name, value, null);
- }
-
- public TopologyException(String name, Object value, String message) {
- super("Invalid topology building" + (message == null ? "" : ": " + message));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 68680ab..46dd738 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -18,9 +18,9 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -62,7 +62,7 @@ public abstract class AbstractTask {
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
- throw new KafkaException("Error while creating the state manager", e);
+ throw new ProcessorStateException("Error while creating the state manager", e);
}
}
@@ -95,7 +95,7 @@ public abstract class AbstractTask {
try {
stateMgr.close(recordCollectorOffsets());
} catch (IOException e) {
- throw new KafkaException("Error while closing the state manager in processor context", e);
+ throw new ProcessorStateException("Error while closing the state manager", e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index d888085..b487ff5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,7 +18,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -149,7 +148,7 @@ public class PartitionGroup {
RecordQueue recordQueue = partitionQueues.get(partition);
if (recordQueue == null)
- throw new KafkaException("Record's partition does not belong to this partition-group.");
+ throw new IllegalStateException("Record's partition does not belong to this partition-group.");
return recordQueue.size();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 102c534..7931a6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,11 +17,11 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -120,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
- throw new KafkaException("Can only create state stores during initialization.");
+ throw new IllegalStateException("Can only create state stores during initialization.");
stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
@@ -130,10 +130,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
ProcessorNode node = task.node();
if (node == null)
- throw new KafkaException("accessing from an unknown node");
+ throw new TopologyBuilderException("Accessing from an unknown node");
if (!node.stateStores.contains(name))
- throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
+ throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
return stateMgr.getStore(name);
}
@@ -141,7 +141,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public String topic() {
if (task.record() == null)
- throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
+ throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
return task.record().topic();
}
@@ -149,7 +149,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public int partition() {
if (task.record() == null)
- throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
+ throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
return task.record().partition();
}
@@ -157,7 +157,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public long offset() {
if (this.task.record() == null)
- throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
+ throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
return this.task.record().offset();
}
@@ -165,7 +165,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@Override
public long timestamp() {
if (task.record() == null)
- throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
+ throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
return task.record().timestamp;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 547bb15..bc7f4b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -160,7 +160,7 @@ public class ProcessorStateManager {
} while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
if (partitionNotFound)
- throw new KafkaException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
+ throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
this.stores.put(store.name(), store);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 89d185c..133d597 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
@@ -112,7 +111,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
- throw new KafkaException("Can only create state stores during initialization.");
+ throw new IllegalStateException("Can only create state stores during initialization.");
stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 5d87e5a..d499534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,12 +24,13 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
return partitions;
} catch (IOException e) {
- throw new KafkaException(e);
+ throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
}
}
@@ -158,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
} catch (JsonProcessingException e) {
- throw new KafkaException(e);
+ throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
}
}
@@ -193,7 +194,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
} catch (JsonProcessingException e) {
- throw new KafkaException(e);
+ throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f118f60..eccd02c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -41,6 +41,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -127,6 +130,8 @@ public class StreamThread extends Thread {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
commitAll();
+ // TODO: right now upon partition revocation, we always remove all the tasks;
+ // this behavior can be optimized to only remove affected tasks in the future
removeStreamTasks();
removeStandbyTasks();
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
@@ -229,8 +234,13 @@ public class StreamThread extends Thread {
try {
runLoop();
- } catch (RuntimeException e) {
- log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
+ } catch (KafkaException e) {
+ // just re-throw the exception as it should be logged already
+ throw e;
+ } catch (Exception e) {
+ // we have caught all Kafka related exceptions, and other runtime exceptions
+ // should be due to user application errors
+ log.error("Streams application error during processing in thread [" + this.getName() + "]: ", e);
throw e;
} finally {
shutdown();
@@ -251,13 +261,21 @@ public class StreamThread extends Thread {
private void shutdown() {
log.info("Shutting down stream thread [" + this.getName() + "]");
- // Exceptions should not prevent this call from going through all shutdown steps.
+ // We need to first remove the tasks before shutting down the underlying clients
+ // as they may be required in the previous steps; and exceptions should not
+ // prevent this call from going through all shutdown steps.
try {
commitAll();
} catch (Throwable e) {
// already logged in commitAll()
}
try {
+ removeStreamTasks();
+ removeStandbyTasks();
+ } catch (Throwable e) {
+ // already logged in removeStreamTasks() and removeStandbyTasks()
+ }
+ try {
producer.close();
} catch (Throwable e) {
log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
@@ -272,70 +290,60 @@ public class StreamThread extends Thread {
} catch (Throwable e) {
log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
}
- try {
- removeStreamTasks();
- removeStandbyTasks();
- } catch (Throwable e) {
- // already logged in removeStreamTasks() and removeStandbyTasks()
- }
log.info("Stream thread shutdown complete [" + this.getName() + "]");
}
private void runLoop() {
- try {
- int totalNumBuffered = 0;
- boolean requiresPoll = true;
+ int totalNumBuffered = 0;
+ boolean requiresPoll = true;
- ensureCopartitioning(builder.copartitionGroups());
+ ensureCopartitioning(builder.copartitionGroups());
- consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+ consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
- while (stillRunning()) {
- // try to fetch some records if necessary
- if (requiresPoll) {
- requiresPoll = false;
+ while (stillRunning()) {
+ // try to fetch some records if necessary
+ if (requiresPoll) {
+ requiresPoll = false;
- long startPoll = time.milliseconds();
+ long startPoll = time.milliseconds();
- ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
- if (!records.isEmpty()) {
- for (TopicPartition partition : records.partitions()) {
- StreamTask task = activeTasksByPartition.get(partition);
- task.addRecords(partition, records.records(partition));
- }
+ if (!records.isEmpty()) {
+ for (TopicPartition partition : records.partitions()) {
+ StreamTask task = activeTasksByPartition.get(partition);
+ task.addRecords(partition, records.records(partition));
}
-
- long endPoll = time.milliseconds();
- sensors.pollTimeSensor.record(endPoll - startPoll);
}
- totalNumBuffered = 0;
+ long endPoll = time.milliseconds();
+ sensors.pollTimeSensor.record(endPoll - startPoll);
+ }
- if (!activeTasks.isEmpty()) {
- // try to process one record from each task
- for (StreamTask task : activeTasks.values()) {
- long startProcess = time.milliseconds();
+ totalNumBuffered = 0;
- totalNumBuffered += task.process();
- requiresPoll = requiresPoll || task.requiresPoll();
+ if (!activeTasks.isEmpty()) {
+ // try to process one record from each task
+ for (StreamTask task : activeTasks.values()) {
+ long startProcess = time.milliseconds();
- sensors.processTimeSensor.record(time.milliseconds() - startProcess);
- }
+ totalNumBuffered += task.process();
+ requiresPoll = requiresPoll || task.requiresPoll();
- maybePunctuate();
- } else {
- // even when no task is assigned, we must poll to get a task.
- requiresPoll = true;
+ sensors.processTimeSensor.record(time.milliseconds() - startProcess);
}
- maybeCommit();
- maybeUpdateStandbyTasks();
- maybeClean();
+ maybePunctuate();
+ } else {
+ // even when no task is assigned, we must poll to get a task.
+ requiresPoll = true;
}
- } catch (Exception e) {
- throw new KafkaException(e);
+ maybeCommit();
+ maybeUpdateStandbyTasks();
+
+ maybeClean();
}
}
@@ -396,8 +404,8 @@ public class StreamThread extends Thread {
if (task.maybePunctuate(now))
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
- } catch (Exception e) {
- log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ } catch (KafkaException e) {
+ log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
@@ -418,7 +426,7 @@ public class StreamThread extends Thread {
try {
if (task.commitNeeded())
commitOne(task, time.milliseconds());
- } catch (Exception e) {
+ } catch (KafkaException e) {
log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
@@ -444,7 +452,7 @@ public class StreamThread extends Thread {
private void commitOne(AbstractTask task, long now) {
try {
task.commit();
- } catch (Exception e) {
+ } catch (KafkaException e) {
log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
@@ -485,7 +493,7 @@ public class StreamThread extends Thread {
}
}
}
- } catch (TaskId.TaskIdFormatException e) {
+ } catch (TaskIdFormatException e) {
// there may be some unknown files that sits in the same directory,
// we should ignore these files instead trying to delete them as well
}
@@ -523,7 +531,7 @@ public class StreamThread extends Thread {
if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists())
tasks.add(id);
- } catch (TaskId.TaskIdFormatException e) {
+ } catch (TaskIdFormatException e) {
// there may be some unknown files that sits in the same directory,
// we should ignore these files instead trying to delete them as well
}
@@ -543,7 +551,7 @@ public class StreamThread extends Thread {
private void addStreamTasks(Collection<TopicPartition> assignment) {
if (partitionAssignor == null)
- throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
+ throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
@@ -570,7 +578,7 @@ public class StreamThread extends Thread {
for (TopicPartition partition : partitions)
activeTasksByPartition.put(partition, task);
- } catch (Exception e) {
+ } catch (StreamsException e) {
log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e);
throw e;
}
@@ -578,7 +586,6 @@ public class StreamThread extends Thread {
}
private void removeStreamTasks() {
- // TODO: change this clearing tasks behavior
for (StreamTask task : activeTasks.values()) {
closeOne(task);
}
@@ -594,7 +601,7 @@ public class StreamThread extends Thread {
log.info("Removing a task {}", task.id());
try {
task.close();
- } catch (Exception e) {
+ } catch (StreamsException e) {
log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
@@ -615,7 +622,7 @@ public class StreamThread extends Thread {
private void addStandbyTasks() {
if (partitionAssignor == null)
- throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
+ throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
@@ -673,14 +680,14 @@ public class StreamThread extends Thread {
List<PartitionInfo> infos = consumer.partitionsFor(topic);
if (infos == null)
- throw new KafkaException("topic not found: " + topic);
+ throw new TopologyBuilderException("Topic not found: " + topic);
if (numPartitions == -1) {
numPartitions = infos.size();
} else if (numPartitions != infos.size()) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
+ throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 2bd4457..c2175bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,9 +17,9 @@
package org.apache.kafka.streams.processor.internals.assignment;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.ByteBufferInputStream;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,12 +87,12 @@ public class AssignmentInfo {
return ByteBuffer.wrap(baos.toByteArray());
} else {
- TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+ TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
log.error(ex.getMessage(), ex);
throw ex;
}
} catch (IOException ex) {
- throw new KafkaException("failed to encode AssignmentInfo", ex);
+ throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}
@@ -128,12 +128,12 @@ public class AssignmentInfo {
return new AssignmentInfo(activeTasks, standbyTasks);
} else {
- TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+ TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
}
} catch (IOException ex) {
- throw new KafkaException("failed to decode AssignmentInfo", ex);
+ throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 43009a1..ccd2f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
deleted file mode 100644
index 839a6c2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.common.KafkaException;
-
-/**
- * The run time exception class for stream task assignments
- */
-public class TaskAssignmentException extends KafkaException {
-
- private final static long serialVersionUID = 1L;
-
- public TaskAssignmentException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index d1e0782..2501677 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6c77ab2..dea7e0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -102,13 +102,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
dir.getParentFile().mkdirs();
return RocksDB.open(options, dir.toString());
} else {
- throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
+ throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
+ throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
}
@@ -128,7 +128,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
+ throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e);
}
}
@@ -142,7 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
+ throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e);
}
}
@@ -177,7 +177,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
db.flush(fOptions);
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.name, e);
+ throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index f79063f..e75b595 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.processor.TopologyException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals;
public class KStreamBuilderTest {
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testFrom() {
final KStreamBuilder builder = new KStreamBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a2f6ec0..a93b8ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -40,7 +41,7 @@ import static org.junit.Assert.assertTrue;
public class TopologyBuilderTest {
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddSourceWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -48,7 +49,7 @@ public class TopologyBuilderTest {
builder.addSource("source", "topic-2");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddSourceWithSameTopic() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -56,7 +57,7 @@ public class TopologyBuilderTest {
builder.addSource("source-2", "topic-1");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddProcessorWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -65,21 +66,21 @@ public class TopologyBuilderTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddProcessorWithWrongParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddProcessorWithSelfParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddSinkWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -88,14 +89,14 @@ public class TopologyBuilderTest {
builder.addSink("sink", "topic-3", "source");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddSinkWithWrongParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSink("sink", "topic-2", "source");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddSinkWithSelfParent() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -145,14 +146,14 @@ public class TopologyBuilderTest {
assertEquals(3, builder.sourceTopics().size());
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithNonExistingProcessor() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithSource() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -160,7 +161,7 @@ public class TopologyBuilderTest {
builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithSink() {
final TopologyBuilder builder = new TopologyBuilder();
@@ -168,7 +169,7 @@ public class TopologyBuilderTest {
builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
}
- @Test(expected = TopologyException.class)
+ @Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithDuplicates() {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
index f342dcd..c014ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
@@ -50,7 +50,7 @@ public class StateUtils {
});
return dir;
} catch (IOException ex) {
- throw new RuntimeException("failed to create a temp dir", ex);
+ throw new RuntimeException("Failed to create a temp dir", ex);
}
}