You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/05/01 23:51:15 UTC

git commit: SAMZA-151: Fail early when a consumer is misconfigured

Repository: incubator-samza
Updated Branches:
  refs/heads/master f3cb10924 -> 354bcdb77


SAMZA-151: Fail early when a consumer is misconfigured


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/354bcdb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/354bcdb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/354bcdb7

Branch: refs/heads/master
Commit: 354bcdb77bc8d4d1b0efdf9ca405916c7399075a
Parents: f3cb109
Author: Yan Fang <yanfang724 at gmail dot com>
Authored: Thu May 1 14:50:51 2014 -0700
Committer: Jakob Homan <jg...@apache.org>
Committed: Thu May 1 14:50:51 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/system/SystemConsumers.scala   | 17 +++++++++-
 .../samza/system/TestSystemConsumers.scala      | 35 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 7624aef..b537046 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -26,6 +26,7 @@ import grizzled.slf4j.Logging
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.util.DoublingBackOff
 import org.apache.samza.system.chooser.BufferingMessageChooser
+import org.apache.samza.SamzaException
 
 /**
  * The SystemConsumers class coordinates between all SystemConsumers, the
@@ -159,7 +160,12 @@ class SystemConsumers(
     metrics.registerSystemStream(systemStreamPartition.getSystemStream)
     buffer.register(systemStreamPartition, offset)
     updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
-    consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+
+    try {
+      consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+    } catch {
+      case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e)
+    }
   }
 
   /**
@@ -280,3 +286,12 @@ class SystemConsumers(
       .foreach(updateFetchMap(_))
   }
 }
+
+/**
+ * When SystemConsumer registers consumers, there are situations where system can not recover
+ * from. Such as a failed consumer is used in task.input and changelogs.
+ * SystemConsumersException is thrown to indicate a hard failure when the system can not recover from.
+ */
+class SystemConsumersException(s: String, t: Throwable) extends SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index e1b211d..e1a4c15 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -58,4 +58,39 @@ class TestSystemConsumers {
     assertEquals(1, registered.size)
     assertEquals("0", registered(systemStreamPartition))
   }
+
+  @Test
+  def testThrowSystemConsumersExceptionWhenTheSystemDoesNotHaveConsumer() {
+    val system = "test-system"
+    val system2 = "test-system2"
+    val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+    val systemStreamPartition2 = new SystemStreamPartition(system2, "some-stream", new Partition(1))
+    var started = 0
+    var stopped = 0
+    var registered = Map[SystemStreamPartition, String]()
+    
+    val consumer = Map(system -> new SystemConsumer {
+      def start {}
+      def stop {}
+      def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+      def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+    })
+    val consumers = new SystemConsumers(new MessageChooser {
+      def update(envelope: IncomingMessageEnvelope) = Unit
+      def choose = null
+      def start = started += 1
+      def stop = stopped += 1
+      def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
+    }, consumer, null)
+     
+    // it should throw a SystemConsumersException because system2 does not have a consumer
+    var caughtRightException = false
+    try {
+    	consumers.register(systemStreamPartition2, "0")
+    } catch {
+    	case e: SystemConsumersException => caughtRightException = true
+    	case _: Throwable => caughtRightException = false
+    }
+    assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException)
+  }
 }
\ No newline at end of file