You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/13 22:04:27 UTC
samza git commit: ChangeLog should not require KafkaSystemAdmin
Repository: samza
Updated Branches:
refs/heads/master e94abca72 -> 1b9391b9e
ChangeLog should not require KafkaSystemAdmin
Author: Boris S <bo...@apache.org>
Reviewers: Xinyu Liu <xi...@gmail.com>
Closes #325 from sborya/ChangeLogRequireKafkaSystemAdmin
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1b9391b9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1b9391b9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1b9391b9
Branch: refs/heads/master
Commit: 1b9391b9e23a38b898590148dfb3b1e503bdd447
Parents: e94abca
Author: Boris S <bo...@apache.org>
Authored: Fri Oct 13 15:04:22 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Fri Oct 13 15:04:22 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/samza/storage/TaskStorageManager.scala | 3 ++-
.../src/main/scala/org/apache/samza/config/KafkaConfig.scala | 5 +----
.../test/scala/org/apache/samza/config/TestKafkaConfig.scala | 8 ++++++++
3 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 0879e9a..c8c935a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -205,13 +205,14 @@ class TaskStorageManager(
}
private def validateChangelogStreams() = {
- info("Validating change log streams")
+ info("Validating change log streams: " + changeLogSystemStreams)
for ((storeName, systemStream) <- changeLogSystemStreams) {
val systemAdmin = systemAdmins
.getOrElse(systemStream.getSystem,
throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
+
systemAdmin.validateStream(changelogSpec)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index d926dcb..9c33b16 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.kafka.KafkaSystemFactory
import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConverters._
@@ -235,9 +234,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
val systemStream = Util.getSystemStreamFromNames(changelogName)
val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
- if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) {
- storeToChangelog += storeName -> systemStream.getStream
- }
+ storeToChangelog += storeName -> systemStream.getStream
}
storeToChangelog
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 9d1e99b..0474cbe 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -130,6 +130,14 @@ class TestKafkaConfig {
assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
+
+ props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
+ val mapConfig1 = new MapConfig(props.asScala.asJava)
+ val kafkaConfig1 = new KafkaConfig(mapConfig)
+ val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
+ assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse(""))
+ assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse(""))
+ assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse(""))
}
@Test