You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/10/14 23:13:53 UTC
[samza] branch master updated: SAMZA-2348 : Notifying BEM about
kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy
failures (#1187)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new cb78a61 SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy failures (#1187)
cb78a61 is described below
commit cb78a614716bc39281edc973435626c2032f27ca
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Mon Oct 14 16:13:44 2019 -0700
SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy failures (#1187)
* Notifying BEM about kafkaConsumerProxy failure via kafkaConsumer to prevent BEM poll blocked in case of proxy failures
---
.../java/org/apache/samza/util/BlockingEnvelopeMap.java | 13 +++++++++++++
.../apache/samza/system/kafka/KafkaSystemConsumer.java | 15 ++++++++++++++-
.../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 9 ++++++---
.../samza/system/kafka/KafkaConsumerProxyFactory.java | 2 +-
4 files changed, 34 insertions(+), 5 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 8b792b4..d80b2a4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
@@ -68,6 +69,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
+ private volatile Throwable failureCause = null;
public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
@@ -142,6 +144,13 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
+
+ // Check for consumerFailure and throw exception
+ if (this.failureCause != null) {
+ String message = String.format("%s: Consumer has stopped.", this);
+ throw new SamzaException(message, this.failureCause);
+ }
+
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
@@ -241,6 +250,10 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
}
+ protected void setFailureCause(Throwable throwable) {
+ this.failureCause = throwable;
+ }
+
public class BlockingEnvelopeMapMetrics {
private final String group;
private final MetricsRegistry metricsRegistry;
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 015b76a..33e2520 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -101,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
messageSink = new KafkaConsumerMessageSink();
// Create the proxy to do the actual message reading.
- proxy = kafkaConsumerProxyFactory.create(this.messageSink);
+ proxy = kafkaConsumerProxyFactory.create(this);
LOG.info("{}: Created proxy {} ", this, proxy);
}
@@ -219,6 +219,15 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
}
+ /**
+ * Invoked by {@link KafkaConsumerProxy} to notify the consumer of failure, so it can relay and stop the BEM polling.
+ * @param throwable the cause of the failure of the proxy
+ */
+ @Override
+ public void setFailureCause(Throwable throwable) {
+ this.setFailureCause(throwable); // notify the BEM
+ }
+
@Override
public void stop() {
if (!stopped.compareAndSet(false, true)) {
@@ -320,6 +329,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return systemName;
}
+ public KafkaConsumerMessageSink getMessageSink() {
+ return this.messageSink;
+ }
+
public class KafkaConsumerMessageSink {
public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index aedf3f1..4ecfc6a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -59,6 +59,7 @@ public class KafkaConsumerProxy<K, V> {
private final Thread consumerPollThread;
private final Consumer<K, V> kafkaConsumer;
+ private final KafkaSystemConsumer kafkaSystemConsumer;
private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
private final String metricName;
@@ -75,10 +76,11 @@ public class KafkaConsumerProxy<K, V> {
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
- public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+ public KafkaConsumerProxy(KafkaSystemConsumer kafkaSystemConsumer, Consumer<K, V> kafkaConsumer, String systemName, String clientId,
KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
+ this.kafkaSystemConsumer = kafkaSystemConsumer;
this.kafkaConsumer = kafkaConsumer;
this.systemName = systemName;
this.sink = messageSink;
@@ -204,6 +206,7 @@ public class KafkaConsumerProxy<K, V> {
// KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
failureCause = throwable;
isRunning = false;
+ kafkaSystemConsumer.setFailureCause(this.failureCause);
}
if (!isRunning) {
@@ -466,9 +469,9 @@ public class KafkaConsumerProxy<K, V> {
this.kafkaSystemConsumerMetrics = kafkaSystemConsumerMetrics;
}
- public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink) {
+ public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V> kafkaSystemConsumer) {
String metricName = String.format("%s-%s", systemName, clientId);
- return new KafkaConsumerProxy<>(this.kafkaConsumer, this.systemName, this.clientId, messageSink,
+ return new KafkaConsumerProxy<>(kafkaSystemConsumer, this.kafkaConsumer, this.systemName, this.clientId, kafkaSystemConsumer.getMessageSink(),
this.kafkaSystemConsumerMetrics, metricName);
}
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
index a566d2a..cc4bddc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -25,5 +25,5 @@ package org.apache.samza.system.kafka;
* {@link KafkaConsumerProxy} needs to be used within kafka system components like {@link KafkaSystemConsumer}.
*/
public interface KafkaConsumerProxyFactory<K, V> {
- KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink);
+ KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V> kafkaSystemConsumer);
}