You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/13 22:04:27 UTC

samza git commit: ChangeLog should not require KafkaSystemAdmin

Repository: samza
Updated Branches:
  refs/heads/master e94abca72 -> 1b9391b9e


ChangeLog should not require KafkaSystemAdmin

Author: Boris S <bo...@apache.org>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #325 from sborya/ChangeLogRequireKafkaSystemAdmin


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1b9391b9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1b9391b9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1b9391b9

Branch: refs/heads/master
Commit: 1b9391b9e23a38b898590148dfb3b1e503bdd447
Parents: e94abca
Author: Boris S <bo...@apache.org>
Authored: Fri Oct 13 15:04:22 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Fri Oct 13 15:04:22 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/storage/TaskStorageManager.scala  | 3 ++-
 .../src/main/scala/org/apache/samza/config/KafkaConfig.scala | 5 +----
 .../test/scala/org/apache/samza/config/TestKafkaConfig.scala | 8 ++++++++
 3 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 0879e9a..c8c935a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -205,13 +205,14 @@ class TaskStorageManager(
   }
 
   private def validateChangelogStreams() = {
-    info("Validating change log streams")
+    info("Validating change log streams: " + changeLogSystemStreams)
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemAdmin = systemAdmins
         .getOrElse(systemStream.getSystem,
                    throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
+
       systemAdmin.validateStream(changelogSpec)
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index d926dcb..9c33b16 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConverters._
@@ -235,9 +234,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
       val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
       val systemStream = Util.getSystemStreamFromNames(changelogName)
       val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
-      if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) {
-        storeToChangelog += storeName -> systemStream.getStream
-      }
+      storeToChangelog += storeName -> systemStream.getStream
     }
     storeToChangelog
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 9d1e99b..0474cbe 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -130,6 +130,14 @@ class TestKafkaConfig {
     assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
     assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
     assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
+
+    props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
+    val mapConfig1 = new MapConfig(props.asScala.asJava)
+    val kafkaConfig1 = new KafkaConfig(mapConfig)
+    val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
+    assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse(""))
+    assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse(""))
+    assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse(""))
   }
 
   @Test