You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/15 22:13:55 UTC

git commit: SAMZA-432; throw better exception when sending messages to an undefined system

Repository: incubator-samza
Updated Branches:
  refs/heads/master 90d5a00fc -> a66a66c53


SAMZA-432; throw better exception when sending messages to an undefined system


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a66a66c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a66a66c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a66a66c5

Branch: refs/heads/master
Commit: a66a66c534c587f0092d1d87328b4d56810bd315
Parents: 90d5a00
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Oct 15 13:13:47 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 15 13:13:47 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/samza/system/SystemProducers.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a66a66c5/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index a022b2e..a4b3ffb 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -82,7 +82,9 @@ class SystemProducers(
     }
 
     if (!bytesEnvelope.isEmpty) {
-      producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get)
+      val system = envelope.getSystemStream.getSystem
+      val producer = producers.getOrElse(system, throw new SamzaException("Attempting to produce to unknown system: %s. Available systems: %s. Please add the system to your configuration, or update outgoing message envelope to send to a defined system." format (system, producers.keySet)))
+      producer.send(source, bytesEnvelope.get)
     }
   }
 }