You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/19 17:25:39 UTC

samza git commit: SAMZA-586; disable compressed messages when sending to a store's changelog topic from a kafka system

Repository: samza
Updated Branches:
  refs/heads/master 05a02eed1 -> 8b52e8aec


SAMZA-586; disable compressed messages when sending to a store's changelog topic from a kafka system


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b52e8ae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b52e8ae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b52e8ae

Branch: refs/heads/master
Commit: 8b52e8aecbca2915d50f0ae3600ceec799fac60f
Parents: 05a02ee
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Mar 19 09:25:29 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Mar 19 09:25:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/StorageConfig.scala | 16 +++++++-
 .../apache/samza/config/TestStorageConfig.scala | 40 ++++++++++++++++++++
 .../kafka/KafkaCheckpointManagerFactory.scala   |  4 +-
 .../samza/system/kafka/KafkaSystemFactory.scala | 12 +++++-
 .../system/kafka/TestKafkaSystemFactory.scala   | 13 +++++++
 5 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index f977b8b..be3f106 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -20,8 +20,8 @@
 package org.apache.samza.config
 
 import scala.collection.JavaConversions._
-
 import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
 
 object StorageConfig {
   // stream config constants
@@ -43,4 +43,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
     val conf = config.subset("stores.", true)
     conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
   }
+
+  /**
+   * Helper method to check if a system has a changelog attached to it.
+   */
+  def isChangelogSystem(systemName: String) = {
+    config
+      .getStoreNames
+      // Get changelogs for all stores in the format of "system.stream"
+      .map(getChangelogStream(_))
+      .filter(_.isDefined)
+      // Convert "system.stream" to systemName
+      .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
+      .contains(systemName)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
new file mode 100644
index 0000000..81a35ec
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+import org.apache.samza.config.StorageConfig._
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+class TestStorageConfig {
+  @Test
+  def testIsChangelogSystem {
+    val configMap = Map[String, String](
+      FACTORY.format("system1") -> "some.factory.Class",
+      CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+      FACTORY.format("system2") -> "some.factory.Class")
+    val config = new MapConfig(configMap)
+    assertFalse(config.isChangelogSystem("system3"))
+    assertFalse(config.isChangelogSystem("system2"))
+    assertTrue(config.isChangelogSystem("system1"))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 7fc6d89..3dfa26a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -43,8 +43,8 @@ object KafkaCheckpointManagerFactory {
   val INJECTED_PRODUCER_PROPERTIES = Map(
     "acks" -> "all",
     // Forcibly disable compression because Kafka doesn't support compression
-    // on log compacted topics. Details in SAMZA-393.
-    "compression.codec" -> "none")
+    // on log compacted topics. Details in SAMZA-586.
+    "compression.type" -> "none")
 
   // Set the checkpoint topic configs to have a very small segment size and
   // enable log compaction. This keeps job startup time small since there 

http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4f15002..c84ceb7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -25,9 +25,18 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.system.SystemFactory
+import org.apache.samza.config.StorageConfig._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZKStringSerializer
 
+object KafkaSystemFactory extends Logging {
+  def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
+    warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
+    Map[String, String]("compression.type" -> "none")
+  } else {
+    Map[String, String]()
+  }
+}
 
 class KafkaSystemFactory extends SystemFactory with Logging {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
@@ -66,7 +75,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
     val clientId = KafkaUtil.getClientId("samza-producer", config)
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
     val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 5f65144..ce84b6d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
+import org.apache.samza.config.StorageConfig
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStream
 import org.junit.Assert._
@@ -83,4 +84,16 @@ class TestKafkaSystemFactory {
     assertNotNull(producer)
     assertTrue(producer.isInstanceOf[KafkaSystemProducer])
   }
+
+  @Test
+  def testInjectedProducerProps {
+    val configMap = Map[String, String](
+      StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
+      StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+      StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
+    val config = new MapConfig(configMap)
+    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
+    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
+    assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
+  }
 }