You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/10/14 23:13:53 UTC

[samza] branch master updated: SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy failures (#1187)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new cb78a61  SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy failures (#1187)
cb78a61 is described below

commit cb78a614716bc39281edc973435626c2032f27ca
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Mon Oct 14 16:13:44 2019 -0700

    SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM poll-blockin in case of proxy failures (#1187)
    
    * Notifying BEM about kafkaConsumerProxy failure via kafkaConsumer to prevent BEM poll blocked in case of proxy failures
---
 .../java/org/apache/samza/util/BlockingEnvelopeMap.java   | 13 +++++++++++++
 .../apache/samza/system/kafka/KafkaSystemConsumer.java    | 15 ++++++++++++++-
 .../org/apache/samza/system/kafka/KafkaConsumerProxy.java |  9 ++++++---
 .../samza/system/kafka/KafkaConsumerProxyFactory.java     |  2 +-
 4 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 8b792b4..d80b2a4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -68,6 +69,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;  // size in bytes per SystemStreamPartition
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
   private final Clock clock;
+  private volatile Throwable failureCause = null;
 
   public BlockingEnvelopeMap() {
     this(new NoOpMetricsRegistry());
@@ -142,6 +144,13 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
           // Block until we get at least one message, or until we catch up to
           // the head of the stream.
           while (envelope == null && !isAtHead(systemStreamPartition)) {
+
+            // Check for consumerFailure and throw exception
+            if (this.failureCause != null) {
+              String message = String.format("%s: Consumer has stopped.", this);
+              throw new SamzaException(message, this.failureCause);
+            }
+
             metrics.incBlockingPoll(systemStreamPartition);
             envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
           }
@@ -241,6 +250,10 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
   }
 
+  protected void setFailureCause(Throwable throwable) {
+    this.failureCause = throwable;
+  }
+
   public class BlockingEnvelopeMapMetrics {
     private final String group;
     private final MetricsRegistry metricsRegistry;
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 015b76a..33e2520 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -101,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     messageSink = new KafkaConsumerMessageSink();
 
     // Create the proxy to do the actual message reading.
-    proxy = kafkaConsumerProxyFactory.create(this.messageSink);
+    proxy = kafkaConsumerProxyFactory.create(this);
     LOG.info("{}: Created proxy {} ", this, proxy);
   }
 
@@ -219,6 +219,15 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
         this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
   }
 
+  /**
+   * Invoked by {@link KafkaConsumerProxy} to notify the consumer of failure, so it can relay and stop the BEM polling.
+   * @param throwable the cause of the failure of the proxy
+   */
+  @Override
+  public void setFailureCause(Throwable throwable) {
+    this.setFailureCause(throwable); // notify the BEM
+  }
+
   @Override
   public void stop() {
     if (!stopped.compareAndSet(false, true)) {
@@ -320,6 +329,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return systemName;
   }
 
+  public KafkaConsumerMessageSink getMessageSink() {
+    return this.messageSink;
+  }
+
   public class KafkaConsumerMessageSink {
 
     public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index aedf3f1..4ecfc6a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -59,6 +59,7 @@ public class KafkaConsumerProxy<K, V> {
 
   private final Thread consumerPollThread;
   private final Consumer<K, V> kafkaConsumer;
+  private final KafkaSystemConsumer kafkaSystemConsumer;
   private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
   private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
   private final String metricName;
@@ -75,10 +76,11 @@ public class KafkaConsumerProxy<K, V> {
   private volatile Throwable failureCause = null;
   private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
 
-  public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+  public KafkaConsumerProxy(KafkaSystemConsumer kafkaSystemConsumer, Consumer<K, V> kafkaConsumer, String systemName, String clientId,
       KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
       String metricName) {
 
+    this.kafkaSystemConsumer = kafkaSystemConsumer;
     this.kafkaConsumer = kafkaConsumer;
     this.systemName = systemName;
     this.sink = messageSink;
@@ -204,6 +206,7 @@ public class KafkaConsumerProxy<K, V> {
         // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
         failureCause = throwable;
         isRunning = false;
+        kafkaSystemConsumer.setFailureCause(this.failureCause);
       }
 
       if (!isRunning) {
@@ -466,9 +469,9 @@ public class KafkaConsumerProxy<K, V> {
       this.kafkaSystemConsumerMetrics = kafkaSystemConsumerMetrics;
     }
 
-    public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink) {
+    public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V> kafkaSystemConsumer) {
       String metricName = String.format("%s-%s", systemName, clientId);
-      return new KafkaConsumerProxy<>(this.kafkaConsumer, this.systemName, this.clientId, messageSink,
+      return new KafkaConsumerProxy<>(kafkaSystemConsumer, this.kafkaConsumer, this.systemName, this.clientId, kafkaSystemConsumer.getMessageSink(),
           this.kafkaSystemConsumerMetrics, metricName);
     }
   }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
index a566d2a..cc4bddc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -25,5 +25,5 @@ package org.apache.samza.system.kafka;
  * {@link KafkaConsumerProxy} needs to be used within kafka system components like {@link KafkaSystemConsumer}.
  */
 public interface KafkaConsumerProxyFactory<K, V> {
-  KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink);
+  KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V> kafkaSystemConsumer);
 }