You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/10/20 00:21:16 UTC
samza git commit: SAMZA-791 - KafkaSystemFactory narrows return types
Repository: samza
Updated Branches:
refs/heads/master f7f237e93 -> f70a11c4a
SAMZA-791 - KafkaSystemFactory narrows return types
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f70a11c4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f70a11c4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f70a11c4
Branch: refs/heads/master
Commit: f70a11c4a747b77035dddfba2296ba3b667928e2
Parents: f7f237e
Author: Aleksandar Bircakovic <a....@levi9.com>
Authored: Mon Oct 19 15:20:55 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Mon Oct 19 15:20:55 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/kafka/KafkaSystemFactory.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f70a11c4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d84bf06..a60cda2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -31,6 +31,9 @@ import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZKStringSerializer
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
object KafkaSystemFactory extends Logging {
def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
@@ -42,7 +45,7 @@ object KafkaSystemFactory extends Logging {
}
class KafkaSystemFactory extends SystemFactory with Logging {
- def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
val clientId = KafkaUtil.getClientId("samza-consumer", config)
val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
@@ -77,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
offsetGetter = offsetGetter)
}
- def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
val clientId = KafkaUtil.getClientId("samza-producer", config)
val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
@@ -95,7 +98,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
metrics)
}
- def getAdmin(systemName: String, config: Config) = {
+ def getAdmin(systemName: String, config: Config): SystemAdmin = {
val clientId = KafkaUtil.getClientId("samza-admin", config)
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
val bootstrapServers = producerConfig.bootsrapServers