You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/10/20 00:21:16 UTC

samza git commit: SAMZA-791 - KafkaSystemFactory narrows return types

Repository: samza
Updated Branches:
  refs/heads/master f7f237e93 -> f70a11c4a


SAMZA-791 - KafkaSystemFactory narrows return types


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f70a11c4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f70a11c4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f70a11c4

Branch: refs/heads/master
Commit: f70a11c4a747b77035dddfba2296ba3b667928e2
Parents: f7f237e
Author: Aleksandar Bircakovic <a....@levi9.com>
Authored: Mon Oct 19 15:20:55 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Mon Oct 19 15:20:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemFactory.scala  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f70a11c4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d84bf06..a60cda2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -31,6 +31,9 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZKStringSerializer
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
 
 object KafkaSystemFactory extends Logging {
   def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
@@ -42,7 +45,7 @@ object KafkaSystemFactory extends Logging {
 }
 
 class KafkaSystemFactory extends SystemFactory with Logging {
-  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
     val clientId = KafkaUtil.getClientId("samza-consumer", config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
@@ -77,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       offsetGetter = offsetGetter)
   }
 
-  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+  def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
     val clientId = KafkaUtil.getClientId("samza-producer", config)
     val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
@@ -95,7 +98,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       metrics)
   }
 
-  def getAdmin(systemName: String, config: Config) = {
+  def getAdmin(systemName: String, config: Config): SystemAdmin = {
     val clientId = KafkaUtil.getClientId("samza-admin", config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val bootstrapServers = producerConfig.bootsrapServers