You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/15 00:03:54 UTC
[2/5] samza git commit: SAMZA-608;
don't hange on serde errors in system consumers
SAMZA-608; don't hange on serde errors in system consumers
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f44f8dac
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f44f8dac
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f44f8dac
Branch: refs/heads/samza-sql
Commit: f44f8dac253eb34348140b5449a2f6a5b29b20db
Parents: 233837e
Author: Yi Pan <ni...@gmail.com>
Authored: Mon Apr 13 10:46:31 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Apr 14 14:40:44 2015 -0700
----------------------------------------------------------------------
.../apache/samza/system/SystemConsumers.scala | 18 ++++++++++----
.../samza/system/TestSystemConsumers.scala | 25 ++++++++++++++++++--
2 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f44f8dac/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 76fa4ad..1ec5e32 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -204,9 +204,7 @@ class SystemConsumers(
metrics.choseObject.inc
metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
- if (!update(systemStreamPartition)) {
- emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
- }
+ tryUpdate(systemStreamPartition)
}
if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
@@ -257,7 +255,7 @@ class SystemConsumers(
// Update the chooser if it needs a message for this SSP.
if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) {
- update(systemStreamPartition)
+ tryUpdate(systemStreamPartition)
}
}
}
@@ -266,6 +264,18 @@ class SystemConsumers(
}
}
+ private def tryUpdate(ssp: SystemStreamPartition) {
+ var updated = false
+ try {
+ updated = update(ssp)
+ } finally {
+ if (!updated) {
+ // if failed to update the chooser, add the ssp back into the emptySystemStreamPartitionBySystem map to ensure that we will poll for the next message
+ emptySystemStreamPartitionsBySystem.get(ssp.getSystem).add(ssp)
+ }
+ }
+ }
+
private def refresh {
trace("Refreshing chooser with new messages.")
http://git-wip-us.apache.org/repos/asf/samza/blob/f44f8dac/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 3fdc781..fbaa8ee 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -241,9 +241,10 @@ class TestSystemConsumers {
// it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
consumers2.register(systemStreamPartition, "0")
- consumers2.start
consumer(system).putBytesMessage
consumer(system).putStringMessage
+ consumer(system).putBytesMessage
+ consumers2.start
var notThrowException = true;
try {
@@ -251,9 +252,29 @@ class TestSystemConsumers {
} catch {
case e: Throwable => notThrowException = false
}
-
assertTrue("it should not throw any exception", notThrowException)
+
+ var msgEnvelope = Some(consumers2.choose)
+ assertTrue("Consumer did not succeed in receiving the second message after Serde exception in choose", msgEnvelope.get != null)
+ consumers2.stop
+
+ // ensure that the system consumer will continue after poll() method ignored a Serde exception
+ consumer(system).putStringMessage
+ consumer(system).putBytesMessage
+
+ notThrowException = true;
+ try {
+ consumers2.start
+ } catch {
+ case e: Throwable => notThrowException = false
+ }
+ assertTrue("SystemConsumer start should not throw any Serde exception", notThrowException)
+
+ msgEnvelope = null
+ msgEnvelope = Some(consumers2.choose)
+ assertTrue("Consumer did not succeed in receiving the second message after Serde exception in poll", msgEnvelope.get != null)
consumers2.stop
+
}
/**