You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/05/01 23:51:15 UTC
git commit: SAMZA-151: Fail early when a consumer is misconfigured
Repository: incubator-samza
Updated Branches:
refs/heads/master f3cb10924 -> 354bcdb77
SAMZA-151: Fail early when a consumer is misconfigured
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/354bcdb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/354bcdb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/354bcdb7
Branch: refs/heads/master
Commit: 354bcdb77bc8d4d1b0efdf9ca405916c7399075a
Parents: f3cb109
Author: Yan Fang <yanfang724 at gmail dot com>
Authored: Thu May 1 14:50:51 2014 -0700
Committer: Jakob Homan <jg...@apache.org>
Committed: Thu May 1 14:50:51 2014 -0700
----------------------------------------------------------------------
.../apache/samza/system/SystemConsumers.scala | 17 +++++++++-
.../samza/system/TestSystemConsumers.scala | 35 ++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 7624aef..b537046 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -26,6 +26,7 @@ import grizzled.slf4j.Logging
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.util.DoublingBackOff
import org.apache.samza.system.chooser.BufferingMessageChooser
+import org.apache.samza.SamzaException
/**
* The SystemConsumers class coordinates between all SystemConsumers, the
@@ -159,7 +160,12 @@ class SystemConsumers(
metrics.registerSystemStream(systemStreamPartition.getSystemStream)
buffer.register(systemStreamPartition, offset)
updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
- consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+
+ try {
+ consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+ } catch {
+ case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e)
+ }
}
/**
@@ -280,3 +286,12 @@ class SystemConsumers(
.foreach(updateFetchMap(_))
}
}
+
+/**
+ * When SystemConsumer registers consumers, there are situations where system can not recover
+ * from. Such as a failed consumer is used in task.input and changelogs.
+ * SystemConsumersException is thrown to indicate a hard failure when the system can not recover from.
+ */
+class SystemConsumersException(s: String, t: Throwable) extends SamzaException(s, t) {
+ def this(s: String) = this(s, null)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index e1b211d..e1a4c15 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -58,4 +58,39 @@ class TestSystemConsumers {
assertEquals(1, registered.size)
assertEquals("0", registered(systemStreamPartition))
}
+
+ @Test
+ def testThrowSystemConsumersExceptionWhenTheSystemDoesNotHaveConsumer() {
+ val system = "test-system"
+ val system2 = "test-system2"
+ val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+ val systemStreamPartition2 = new SystemStreamPartition(system2, "some-stream", new Partition(1))
+ var started = 0
+ var stopped = 0
+ var registered = Map[SystemStreamPartition, String]()
+
+ val consumer = Map(system -> new SystemConsumer {
+ def start {}
+ def stop {}
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+ def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+ })
+ val consumers = new SystemConsumers(new MessageChooser {
+ def update(envelope: IncomingMessageEnvelope) = Unit
+ def choose = null
+ def start = started += 1
+ def stop = stopped += 1
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
+ }, consumer, null)
+
+ // it should throw a SystemConsumersException because system2 does not have a consumer
+ var caughtRightException = false
+ try {
+ consumers.register(systemStreamPartition2, "0")
+ } catch {
+ case e: SystemConsumersException => caughtRightException = true
+ case _: Throwable => caughtRightException = false
+ }
+ assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException)
+ }
}
\ No newline at end of file