You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/19 17:25:39 UTC
samza git commit: SAMZA-586;
disable compressed messages when sending to a store's changelog topic
from a kafka system
Repository: samza
Updated Branches:
refs/heads/master 05a02eed1 -> 8b52e8aec
SAMZA-586; disable compressed messages when sending to a store's changelog topic from a kafka system
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b52e8ae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b52e8ae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b52e8ae
Branch: refs/heads/master
Commit: 8b52e8aecbca2915d50f0ae3600ceec799fac60f
Parents: 05a02ee
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Mar 19 09:25:29 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Mar 19 09:25:29 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/StorageConfig.scala | 16 +++++++-
.../apache/samza/config/TestStorageConfig.scala | 40 ++++++++++++++++++++
.../kafka/KafkaCheckpointManagerFactory.scala | 4 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 12 +++++-
.../system/kafka/TestKafkaSystemFactory.scala | 13 +++++++
5 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index f977b8b..be3f106 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -20,8 +20,8 @@
package org.apache.samza.config
import scala.collection.JavaConversions._
-
import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
object StorageConfig {
// stream config constants
@@ -43,4 +43,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
val conf = config.subset("stores.", true)
conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
}
+
+ /**
+ * Helper method to check if a system has a changelog attached to it.
+ */
+ def isChangelogSystem(systemName: String) = {
+ config
+ .getStoreNames
+ // Get changelogs for all stores in the format of "system.stream"
+ .map(getChangelogStream(_))
+ .filter(_.isDefined)
+ // Convert "system.stream" to systemName
+ .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
+ .contains(systemName)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
new file mode 100644
index 0000000..81a35ec
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+import org.apache.samza.config.StorageConfig._
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+class TestStorageConfig {
+ @Test
+ def testIsChangelogSystem {
+ val configMap = Map[String, String](
+ FACTORY.format("system1") -> "some.factory.Class",
+ CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+ FACTORY.format("system2") -> "some.factory.Class")
+ val config = new MapConfig(configMap)
+ assertFalse(config.isChangelogSystem("system3"))
+ assertFalse(config.isChangelogSystem("system2"))
+ assertTrue(config.isChangelogSystem("system1"))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 7fc6d89..3dfa26a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -43,8 +43,8 @@ object KafkaCheckpointManagerFactory {
val INJECTED_PRODUCER_PROPERTIES = Map(
"acks" -> "all",
// Forcibly disable compression because Kafka doesn't support compression
- // on log compacted topics. Details in SAMZA-393.
- "compression.codec" -> "none")
+ // on log compacted topics. Details in SAMZA-586.
+ "compression.type" -> "none")
// Set the checkpoint topic configs to have a very small segment size and
// enable log compaction. This keeps job startup time small since there
http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4f15002..c84ceb7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -25,9 +25,18 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.system.SystemFactory
+import org.apache.samza.config.StorageConfig._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZKStringSerializer
+object KafkaSystemFactory extends Logging {
+ def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
+ warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
+ Map[String, String]("compression.type" -> "none")
+ } else {
+ Map[String, String]()
+ }
+}
class KafkaSystemFactory extends SystemFactory with Logging {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
@@ -66,7 +75,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
val clientId = KafkaUtil.getClientId("samza-producer", config)
- val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+ val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
val metrics = new KafkaSystemProducerMetrics(systemName, registry)
http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 5f65144..ce84b6d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
import org.apache.samza.SamzaException
import org.apache.samza.config.MapConfig
+import org.apache.samza.config.StorageConfig
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStream
import org.junit.Assert._
@@ -83,4 +84,16 @@ class TestKafkaSystemFactory {
assertNotNull(producer)
assertTrue(producer.isInstanceOf[KafkaSystemProducer])
}
+
+ @Test
+ def testInjectedProducerProps {
+ val configMap = Map[String, String](
+ StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
+ StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+ StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
+ val config = new MapConfig(configMap)
+ assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
+ assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
+ assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
+ }
}