You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/28 01:34:05 UTC
kafka git commit: KAFKA-2683: ensure wakeup exceptions raised to user
Repository: kafka
Updated Branches:
refs/heads/trunk 13c3e049f -> 1ac2640f8
KAFKA-2683: ensure wakeup exceptions raised to user
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Guozhang Wang
Closes #366 from hachikuji/KAFKA-2683
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ac2640f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ac2640f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ac2640f
Branch: refs/heads/trunk
Commit: 1ac2640f8095262f423c770060b737f81652e211
Parents: 13c3e04
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 27 17:39:19 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 27 17:39:19 2015 -0700
----------------------------------------------------------------------
.../consumer/ConsumerWakeupException.java | 20 ---------------
.../kafka/clients/consumer/KafkaConsumer.java | 26 ++++++++++----------
.../kafka/clients/consumer/MockConsumer.java | 3 ++-
.../consumer/internals/AbstractCoordinator.java | 26 +++++++++++---------
.../consumer/internals/ConsumerCoordinator.java | 10 +++++---
.../internals/ConsumerNetworkClient.java | 10 ++++----
.../kafka/common/errors/WakeupException.java | 26 ++++++++++++++++++++
.../internals/ConsumerNetworkClientTest.java | 4 +--
.../kafka/copycat/runtime/WorkerSinkTask.java | 3 ++-
.../runtime/distributed/DistributedHerder.java | 6 ++---
.../runtime/distributed/WorkerGroupMember.java | 2 +-
.../kafka/copycat/util/KafkaBasedLog.java | 10 ++++----
12 files changed, 81 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
deleted file mode 100644
index 35f1ec9..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.KafkaException;
-
-public class ConsumerWakeupException extends KafkaException {
- private static final long serialVersionUID = 1L;
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 06a9239..7aef8a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -313,9 +313,9 @@ import java.util.regex.Pattern;
*
* <p>
* The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
- * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread
- * blocking on the operation. This can be used to shutdown the consumer from another thread. The following
- * snippet shows the typical pattern:
+ * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be
+ * thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread.
+ * The following snippet shows the typical pattern:
*
* <pre>
* public class KafkaConsumerRunner implements Runnable {
@@ -329,7 +329,7 @@ import java.util.regex.Pattern;
* ConsumerRecords records = consumer.poll(10000);
* // Handle new records
* }
- * } catch (ConsumerWakeupException e) {
+ * } catch (WakeupException e) {
* // Ignore exception if closing
* if (!closed.get()) throw e;
* } finally {
@@ -778,7 +778,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* offset reset policy has been configured.
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
@@ -818,7 +818,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The fetched records (may be empty)
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
@@ -858,7 +858,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
*
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public void commitSync() {
@@ -881,7 +881,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* encountered (in which case it is thrown to the caller).
*
* @param offsets A map of offsets by partition with associated metadata
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1006,7 +1006,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The offset
* @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
* available.
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
public long position(TopicPartition partition) {
acquire();
@@ -1033,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partition The partition to check
* @return The last committed offset and metadata or null if there was no prior commit
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1071,7 +1071,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param topic The topic to get partition metadata for
* @return The list of partitions
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
@@ -1094,7 +1094,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* server.
*
* @return The map of topics and its partitions
- * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
@@ -1158,7 +1158,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
- * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
+ * The thread which is blocking in an operation will throw {@link WakeupException}.
*/
@Override
public void wakeup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ed1c1e2..25c0c2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
import java.util.ArrayList;
import java.util.Collection;
@@ -135,7 +136,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (wakeup.get()) {
wakeup.set(false);
- throw new ConsumerWakeupException();
+ throw new WakeupException();
}
if (exception != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 549c8de..8d5ee16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.GroupMetadataRequest;
import org.apache.kafka.common.requests.GroupMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -181,8 +180,12 @@ public abstract class AbstractCoordinator {
RequestFuture<Void> future = sendGroupMetadataRequest();
client.poll(future, requestTimeoutMs);
- if (future.failed())
- client.awaitMetadataUpdate();
+ if (future.failed()) {
+ if (future.isRetriable())
+ client.awaitMetadataUpdate();
+ else
+ throw future.exception();
+ }
}
}
@@ -417,12 +420,8 @@ public abstract class AbstractCoordinator {
RequestFuture<ByteBuffer> future) {
short errorCode = syncResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
- try {
- future.complete(syncResponse.memberAssignment());
- sensors.syncLatency.record(response.requestLatencyMs());
- } catch (SchemaException e) {
- future.raise(e);
- }
+ future.complete(syncResponse.memberAssignment());
+ sensors.syncLatency.record(response.requestLatencyMs());
} else {
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.forCode(errorCode));
@@ -588,8 +587,13 @@ public abstract class AbstractCoordinator {
return;
}
- R response = parse(clientResponse);
- handle(response, future);
+ try {
+ R response = parse(clientResponse);
+ handle(response, future);
+ } catch (RuntimeException e) {
+ if (!future.isDone())
+ future.raise(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 20d1564..641939a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
@@ -180,6 +180,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
+ } catch (WakeupException e) {
+ throw e;
} catch (Exception e) {
log.error("User provided listener " + listener.getClass().getName()
+ " failed on partition assignment: ", e);
@@ -234,6 +236,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
try {
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
+ } catch (WakeupException e) {
+ throw e;
} catch (Exception e) {
log.error("User provided listener " + listener.getClass().getName()
+ " failed on partition revocation: ", e);
@@ -302,7 +306,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
try {
maybeAutoCommitOffsetsSync();
return;
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// ignore wakeups while closing to ensure we have a chance to commit
continue;
}
@@ -368,7 +372,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
if (autoCommitEnabled) {
try {
commitOffsetsSync(subscriptions.allConsumed());
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index e3a2514..4757fc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -17,7 +17,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -147,7 +147,7 @@ public class ConsumerNetworkClient implements Closeable {
/**
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ * @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
@@ -159,7 +159,7 @@ public class ConsumerNetworkClient implements Closeable {
* @param future The request future to wait for
* @param timeout The maximum duration (in ms) to wait for the request
* @return true if the future is done, false otherwise
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ * @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public boolean poll(RequestFuture<?> future, long timeout) {
long now = time.milliseconds();
@@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
* Poll for any network IO. All send requests will either be transmitted on the network
* or failed when this call completes.
* @param timeout The maximum time to wait for an IO event.
- * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+ * @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public void poll(long timeout) {
poll(timeout, time.milliseconds());
@@ -298,7 +298,7 @@ public class ConsumerNetworkClient implements Closeable {
if (wakeup.get()) {
failUnsentRequests();
wakeup.set(false);
- throw new ConsumerWakeupException();
+ throw new WakeupException();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
new file mode 100644
index 0000000..a2e718d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Exception used to indicate preemption of a blocking operation by an external thread.
+ * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer#wakeup}
+ * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)},
+ * which would raise an instance of this exception.
+ */
+public class WakeupException extends KafkaException {
+ private static final long serialVersionUID = 1L;
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 6a42058..1692010 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -96,7 +96,7 @@ public class ConsumerNetworkClientTest {
try {
consumerClient.poll(0);
fail();
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
}
client.respond(heartbeatResponse(Errors.NONE.code()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index edb415a..3c5cd13 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.utils.Utils;
@@ -112,7 +113,7 @@ class WorkerSinkTask implements WorkerTask {
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs);
- } catch (ConsumerWakeupException we) {
+ } catch (WakeupException we) {
log.trace("{} consumer woken up", id);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index bf3229d..17bf7b7 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -17,7 +17,7 @@
package org.apache.kafka.copycat.runtime.distributed;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.connector.ConnectorContext;
@@ -159,7 +159,7 @@ public class DistributedHerder implements Herder, Runnable {
member.ensureActive();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// May be due to a request from another thread, or might be stopping. If the latter, we need to check the
// flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
// unless we're in the group.
@@ -217,7 +217,7 @@ public class DistributedHerder implements Herder, Runnable {
member.poll(Long.MAX_VALUE);
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
- } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException
+ } catch (WakeupException e) { // FIXME should not be WakeupException
// Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
index f8cabaa..03960cf 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -147,7 +147,7 @@ public class WorkerGroupMember {
}
/**
- * Interrupt any running poll() calls, causing a ConsumerWakeupException to be thrown in the thread invoking that method.
+ * Interrupt any running poll() calls, causing a WakeupException to be thrown in the thread invoking that method.
*/
public void wakeup() {
this.client.wakeup();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
index 5e860d9..f5e72d3 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -232,7 +232,7 @@ public class KafkaBasedLog<K, V> {
ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
for (ConsumerRecord<K, V> record : records)
consumedCallback.onCompletion(null, record);
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// Expected on get() or stop(). The calling code should handle this
throw e;
} catch (KafkaException e) {
@@ -257,7 +257,7 @@ public class KafkaBasedLog<K, V> {
try {
poll(0);
} finally {
- // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
+ // If there is an exception, even a possibly expected one like WakeupException, we need to make sure
// the consumers position is reset or it'll get into an inconsistent state.
for (TopicPartition tp : assignment) {
long startOffset = offsets.get(tp);
@@ -300,7 +300,7 @@ public class KafkaBasedLog<K, V> {
if (numCallbacks > 0) {
try {
readToLogEnd();
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.
continue;
@@ -318,7 +318,7 @@ public class KafkaBasedLog<K, V> {
try {
poll(Integer.MAX_VALUE);
- } catch (ConsumerWakeupException e) {
+ } catch (WakeupException e) {
// See previous comment, both possible causes of this wakeup are handled by starting this loop again
continue;
}