You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/13 19:46:37 UTC

samza git commit: SAMZA-608; don't hange on serde errors in system consumers

Repository: samza
Updated Branches:
  refs/heads/master 3eb15a053 -> ffa84c0b7


SAMZA-608; don't hange on serde errors in system consumers


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ffa84c0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ffa84c0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ffa84c0b

Branch: refs/heads/master
Commit: ffa84c0b724fd2251190a4402a122ac27fded560
Parents: 3eb15a0
Author: Yi Pan <ni...@gmail.com>
Authored: Mon Apr 13 10:46:31 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Apr 13 10:46:31 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/system/SystemConsumers.scala   | 18 ++++++++++----
 .../samza/system/TestSystemConsumers.scala      | 25 ++++++++++++++++++--
 2 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ffa84c0b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 76fa4ad..1ec5e32 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -204,9 +204,7 @@ class SystemConsumers(
       metrics.choseObject.inc
       metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
 
-      if (!update(systemStreamPartition)) {
-        emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
-      }
+      tryUpdate(systemStreamPartition)
     }
 
     if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
@@ -257,7 +255,7 @@ class SystemConsumers(
 
           // Update the chooser if it needs a message for this SSP.
           if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) {
-            update(systemStreamPartition)
+            tryUpdate(systemStreamPartition)
           }
         }
       }
@@ -266,6 +264,18 @@ class SystemConsumers(
     }
   }
 
+  private def tryUpdate(ssp: SystemStreamPartition) {
+    var updated = false
+    try {
+      updated = update(ssp)
+    } finally {
+      if (!updated) {
+        // if failed to update the chooser, add the ssp back into the emptySystemStreamPartitionBySystem map to ensure that we will poll for the next message
+        emptySystemStreamPartitionsBySystem.get(ssp.getSystem).add(ssp)
+      }
+    }
+  }
+
   private def refresh {
     trace("Refreshing chooser with new messages.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ffa84c0b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 3fdc781..fbaa8ee 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -241,9 +241,10 @@ class TestSystemConsumers {
     // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
     val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
     consumers2.register(systemStreamPartition, "0")
-    consumers2.start
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
+    consumer(system).putBytesMessage
+    consumers2.start
 
     var notThrowException = true;
     try {
@@ -251,9 +252,29 @@ class TestSystemConsumers {
     } catch {
       case e: Throwable => notThrowException = false
     }
-
     assertTrue("it should not throw any exception", notThrowException)
+
+    var msgEnvelope = Some(consumers2.choose)
+    assertTrue("Consumer did not succeed in receiving the second message after Serde exception in choose", msgEnvelope.get != null)
+    consumers2.stop
+
+    // ensure that the system consumer will continue after poll() method ignored a Serde exception
+    consumer(system).putStringMessage
+    consumer(system).putBytesMessage
+
+    notThrowException = true;
+    try {
+      consumers2.start
+    } catch {
+      case e: Throwable => notThrowException = false
+    }
+    assertTrue("SystemConsumer start should not throw any Serde exception", notThrowException)
+
+    msgEnvelope = null
+    msgEnvelope = Some(consumers2.choose)
+    assertTrue("Consumer did not succeed in receiving the second message after Serde exception in poll", msgEnvelope.get != null)
     consumers2.stop
+
   }
 
   /**