You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/12/22 18:31:18 UTC

incubator-samza git commit: SAMZA-226; auto-create changelogs

Repository: incubator-samza
Updated Branches:
  refs/heads/master 377e5cc3f -> 0ebfcbd0a


SAMZA-226; auto-create changelogs


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/0ebfcbd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/0ebfcbd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/0ebfcbd0

Branch: refs/heads/master
Commit: 0ebfcbd0ab39b2e1db4bc31e6cd38ee3113469fe
Parents: 377e5cc
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Mon Dec 22 09:30:38 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Dec 22 09:30:38 2014 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  18 +++
 .../java/org/apache/samza/config/Config.java    |  17 +++
 .../org/apache/samza/system/SystemAdmin.java    |   7 ++
 ...inglePartitionWithoutOffsetsSystemAdmin.java |   6 +
 .../apache/samza/container/SamzaContainer.scala |  38 ++-----
 .../samza/storage/TaskStorageManager.scala      |  40 +++++--
 .../filereader/FileReaderSystemAdmin.scala      |   4 +
 .../samza/checkpoint/TestOffsetManager.scala    |   4 +
 .../samza/coordinator/TestJobCoordinator.scala  |   2 +
 .../kafka/KafkaCheckpointManagerFactory.scala   |   4 +-
 .../org/apache/samza/config/KafkaConfig.scala   |  46 ++++++++
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 114 +++++++++++++++++--
 .../samza/system/kafka/KafkaSystemFactory.scala |  26 ++++-
 .../scala/org/apache/samza/util/KafkaUtil.scala |   4 +-
 .../apache/samza/config/TestKafkaConfig.scala   |  18 +++
 .../system/kafka/TestKafkaSystemAdmin.scala     |  11 +-
 .../samza/system/mock/MockSystemAdmin.java      |   8 +-
 .../test/integration/TestStatefulTask.scala     |  24 ++--
 .../yarn/TestSamzaAppMasterTaskManager.scala    |   2 +
 19 files changed, 322 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 6a5d209..4ccc0e7 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -772,6 +772,24 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
+                    <td class="default">2</td>
+                    <td class="description">
+                        The property defines the number of replicas to use for the change log stream.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>topic-level-property</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The property allows you to specify topic level settings for the changelog topic to be created.
+                        For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete".
+                        Please refer to http://kafka.apache.org/documentation.html#configuration for more topic level configurations.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="regex-rewriter">
                         Consuming all Kafka topics matching a regular expression<br>
                         <span class="subtitle">

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/config/Config.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java
index 2048e90..9f7ade0 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -26,6 +26,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Store and retrieve named, typed values as configuration for classes implementing this interface.
@@ -52,6 +54,21 @@ public abstract class Config implements Map<String, String> {
     return new MapConfig(out);
   }
 
+  public Config regexSubset(String regex) {
+      Map<String, String> out = new HashMap<String, String>();
+      Pattern pattern = Pattern.compile(regex);
+
+      for (Entry<String, String> entry : entrySet()) {
+        String k = entry.getKey();
+        Matcher matcher = pattern.matcher(k);
+        if(matcher.find()){
+            out.put(k, entry.getValue());
+        }
+      }
+
+      return new MapConfig(out);
+  }
+
   public String get(String k, String defaultString) {
     if (!containsKey(k)) {
       return defaultString;

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 571c606..8995ba3 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -50,4 +50,11 @@ public interface SystemAdmin {
    *         requested in the parameter set.
    */
   Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
+
+    /**
+     * An API to create a change log stream
+     * @param streamName The name of the stream to be created in the underlying stream
+     * @param numOfPartitions The number of partitions in the changelog stream
+     */
+  void createChangelogStream(String streamName, int numOfPartitions);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 38e313f..01997ae 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -54,6 +55,11 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
   }
 
   @Override
+  public void createChangelogStream(String streamName, int numOfPartitions){
+    throw new SamzaException("Method not implemented");
+  }
+
+  @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2f1568d..8a6d865 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.container
 
 import java.io.File
-import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
 import org.apache.samza.config.Config
@@ -47,7 +46,6 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemProducersMetrics
 import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
@@ -58,11 +56,8 @@ import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import java.net.URL
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.job.model.JobModel
 import org.apache.samza.config.JobConfig.Config2Job
 
 object SamzaContainer extends Logging {
@@ -263,10 +258,6 @@ object SamzaContainer extends Logging {
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
-    val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
-
-    info("Got change log stream metadata: %s" format changeLogMetadata)
-
     val serdeManager = new SerdeManager(
       serdes = serdes,
       systemKeySerdes = systemKeySerdes,
@@ -387,6 +378,13 @@ object SamzaContainer extends Logging {
 
     val containerContext = new SamzaContainerContext(containerId, config, taskNames)
 
+    // compute the number of partitions necessary for the change log stream creation.
+    // Increment by 1 because partition starts from 0, but we need the absolute count,
+    // this value is used for change log topic creation.
+    val maxChangeLogStreamPartitions = containerModel.getTasks.values
+            .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId})
+            .getChangelogPartition.getPartitionId + 1
+
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
@@ -440,18 +438,16 @@ object SamzaContainer extends Logging {
 
       info("Got task stores: %s" format taskStores)
 
-      val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition, changeLogMetadata)
-
-      info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
-
       val storageManager = new TaskStorageManager(
         taskName = taskName,
         taskStores = taskStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
-        changeLogOldestOffsets = changeLogOldestOffsets,
+        maxChangeLogStreamPartitions,
+        streamMetadataCache = streamMetadataCache,
         storeBaseDir = storeBaseDir,
-        partition = taskModel.getChangelogPartition)
+        partition = taskModel.getChangelogPartition,
+        systemAdmins = systemAdmins)
 
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions
@@ -494,16 +490,6 @@ object SamzaContainer extends Logging {
       reporters = reporters,
       jvm = jvm)
   }
-
-  /**
-   * Builds a map from SystemStreamPartition to oldest offset for changelogs.
-   */
-  def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
-    inputStreamMetadata
-      .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
-      .filter(_._2 != null)
-      .mapValues(_.getOldestOffset)
-  }
 }
 
 class SamzaContainer(

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index b8719c3..f68a7fe 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -23,10 +23,7 @@ import java.io.File
 import scala.collection.Map
 import org.apache.samza.util.Logging
 import org.apache.samza.Partition
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionIterator
+import org.apache.samza.system._
 import org.apache.samza.util.Util
 import org.apache.samza.SamzaException
 import org.apache.samza.container.TaskName
@@ -50,16 +47,20 @@ class TaskStorageManager(
   taskStores: Map[String, StorageEngine] = Map(),
   storeConsumers: Map[String, SystemConsumer] = Map(),
   changeLogSystemStreams: Map[String, SystemStream] = Map(),
-  changeLogOldestOffsets: Map[SystemStream, String] = Map(),
+  changeLogStreamPartitions: Int,
+  streamMetadataCache: StreamMetadataCache,
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging {
+  partition: Partition,
+  systemAdmins: Map[String, SystemAdmin]) extends Logging {
 
   var taskStoresToRestore = taskStores
+  var changeLogOldestOffsets: Map[SystemStream, String] = Map()
 
   def apply(storageEngineName: String) = taskStores(storageEngineName)
 
   def init {
     cleanBaseDirs
+    createStreams
     startConsumers
     restoreStores
     stopConsumers
@@ -67,7 +68,6 @@ class TaskStorageManager(
 
   private def cleanBaseDirs {
     debug("Cleaning base directories for stores.")
-
     taskStores.keys.foreach(storeName => {
       val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
 
@@ -78,6 +78,21 @@ class TaskStorageManager(
     })
   }
 
+  private def createStreams = {
+    info("Creating streams that are not present for changelog")
+
+    for ((storeName, systemStream) <- changeLogSystemStreams) {
+      var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+      systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions)
+    }
+
+    val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
+    info("Got change log stream metadata: %s" format changeLogMetadata)
+
+    changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata)
+    info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
+  }
+
   private def startConsumers {
     debug("Starting consumers for stores.")
 
@@ -131,4 +146,15 @@ class TaskStorageManager(
 
     taskStores.values.foreach(_.stop)
   }
+
+
+  /**
+   * Builds a map from SystemStreamPartition to oldest offset for changelogs.
+   */
+  private def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
+    inputStreamMetadata
+      .mapValues(_.getSystemStreamPartitionMetadata.get(partition))
+      .filter(_._2 != null)
+      .mapValues(_.getOldestOffset)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index 98e92bc..ec1d749 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -130,4 +130,8 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
       })
     enterPosition
   }
+
+  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+    throw new SamzaException("Method not implemented")
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 35e7f6b..d18d4c4 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -257,6 +257,10 @@ class TestOffsetManager {
 
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
         Map[String, SystemStreamMetadata]()
+
+      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+        new SamzaException("Method not implemented")
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 1eb0eda..a8e5d36 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -116,4 +116,6 @@ class MockSystemAdmin extends SystemAdmin {
       new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null))
     Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata))
   }
+
+  override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index f7db2a1..f2defbd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -50,8 +50,8 @@ object KafkaCheckpointManagerFactory {
     "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
 
   // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there 
-  // are fewer useless (overwritten) messages to read from the checkpoint 
+  // enable log compaction. This keeps job startup time small since there
+  // are fewer useless (overwritten) messages to read from the checkpoint
   // topic.
   def getCheckpointTopicProperties(config: Config) = {
     val segmentBytes = config

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9fc1f56..e57b8ba 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -19,11 +19,20 @@
 
 package org.apache.samza.config
 
+
+import java.util.regex.Pattern
+
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
+
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
 import java.util.Properties
 import kafka.producer.ProducerConfig
 import java.util.UUID
+import scala.collection.JavaConverters._
+import org.apache.samza.system.kafka.KafkaSystemFactory
+import org.apache.samza.config.SystemConfig.Config2System
 
 object KafkaConfig {
   val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
@@ -34,6 +43,14 @@ object KafkaConfig {
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
+  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
+  val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
+  // The default segment size to use for changelog topics
+  val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
+
+  // Helper regular expression definitions to extract/match configurations
+  val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
+
   /**
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
@@ -84,6 +101,35 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
   def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
   def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+  def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
+
+  // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
+  def getKafkaChangelogEnabledStores() = {
+    val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
+    var storeToChangelog = Map[String, String]()
+    for((changelogConfig, changelogName) <- changelogConfigs){
+      // Lookup the factory for this particular stream and verify if it's a kafka system
+      val systemStream = Util.getSystemStreamFromNames(changelogName)
+      val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
+      if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
+        val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
+        val matcher = pattern.matcher(changelogConfig)
+        val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream)
+        storeToChangelog += storeName -> systemStream.getStream
+      }
+    }
+    storeToChangelog
+  }
+
+  // Get all kafka properties for changelog stream topic creation
+  def getChangelogKafkaProperties(name: String) = {
+    val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
+    val kafkaChangeLogProperties = new Properties
+    kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+    kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
+    filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
+    kafkaChangeLogProperties
+  }
 
   // kafka config
   def getKafkaSystemConsumerConfig(

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5ac33ea..b790be1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -19,25 +19,21 @@
 
 package org.apache.samza.system.kafka
 
+import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.ClientUtilTopicMetadataStore
-import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
 import kafka.api._
 import kafka.consumer.SimpleConsumer
-import kafka.utils.Utils
-import kafka.client.ClientUtils
-import kafka.common.TopicAndPartition
-import kafka.common.ErrorMapping
-import kafka.cluster.Broker
-import org.apache.samza.util.Logging
-import java.util.UUID
+import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping}
+import java.util.{Properties, UUID}
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import kafka.consumer.ConsumerConfig
+import kafka.admin.AdminUtils
 
 object KafkaSystemAdmin extends Logging {
   /**
@@ -73,6 +69,13 @@ object KafkaSystemAdmin extends Logging {
 }
 
 /**
+ * A helper class that is used to construct the changelog stream specific information
+ * @param replicationFactor The number of replicas for the changelog stream
+ * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
+ */
+case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
+
+/**
  * A Kafka-based implementation of SystemAdmin.
  */
 class KafkaSystemAdmin(
@@ -106,7 +109,20 @@ class KafkaSystemAdmin(
    * The client ID to use for the simple consumer when fetching metadata from
    * Kafka. Equivalent to Kafka's client.id configuration.
    */
-  clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging {
+  clientId: String = UUID.randomUUID.toString,
+
+  /**
+   * A function that returns a Zookeeper client to connect to fetch the meta
+   * data information
+   */
+  connectZk: () => ZkClient,
+
+  /**
+   * Replication factor for the Changelog topic in kafka
+   * Kafka properties to be used during the Changelog topic creation
+   */
+  topicMetaInformation: Map[String, ChangelogInfo] =  Map[String, ChangelogInfo]()
+  ) extends SystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
@@ -259,4 +275,82 @@ class KafkaSystemAdmin(
 
     offsets
   }
+
+  private def createTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) {
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+    info("Attempting to create change log topic %s." format topicName)
+    info("Using partition count "+ numKafkaChangelogPartitions + " for creating change log topic")
+    val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
+    retryBackoff.run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.createTopic(
+            zkClient,
+            topicName,
+            numKafkaChangelogPartitions,
+            topicMetaInfo.replicationFactor,
+            topicMetaInfo.kafkaProps)
+        } finally {
+          zkClient.close
+        }
+
+        info("Created changelog topic %s." format topicName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: TopicExistsException =>
+            info("Changelog topic %s already exists." format topicName)
+            loop.done
+          case e: Exception =>
+            warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
+            debug("Exception detail:", e)
+        }
+      }
+    )
+  }
+
+  private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) {
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+    info("Validating changelog topic %s." format topicName)
+    retryBackoff.run(
+      loop => {
+        val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(topicName)
+        ErrorMapping.maybeThrowException(topicMetadata.errorCode)
+
+        val partitionCount = topicMetadata.partitionsMetadata.length
+        if (partitionCount < numKafkaChangelogPartitions) {
+          throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions))
+        }
+
+        info("Successfully validated changelog topic %s." format topicName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: KafkaChangelogException => throw e
+          case e: Exception =>
+            warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
+            debug("Exception detail:", e)
+        }
+      }
+    )
+  }
+
+  /**
+   * Exception to be thrown when the change log stream creation or validation has failed
+   */
+  class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
+    def this(s: String) = this(s, null)
+  }
+
+  override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
+    createTopicInKafka(topicName, numKafkaChangelogPartitions)
+    validateTopicInKafka(topicName, numKafkaChangelogPartitions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4ed5e88..4506ea3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,17 +19,21 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.util.KafkaUtil
+
+import java.util.Properties
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.SamzaException
 import kafka.producer.Producer
 import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.ClientUtilTopicMetadataStore
 
-class KafkaSystemFactory extends SystemFactory {
+
+class KafkaSystemFactory extends SystemFactory with Logging {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
     val clientId = KafkaUtil.getClientId("samza-consumer", config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
@@ -95,12 +99,24 @@ class KafkaSystemFactory extends SystemFactory {
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val storeToChangelog = config.getKafkaChangelogEnabledStores()
+
+    // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
+    val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
+    {
+       val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt
+       val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
+       info("Creating topic meta information for topic: " + topicName + " with replication factor: " + replicationFactor)
+       (topicName, changelogInfo)
+    }}.toMap
 
     new KafkaSystemAdmin(
       systemName,
       brokerListString,
       timeout,
       bufferSize,
-      clientId)
+      clientId,
+      () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer),
+      topicMetaInformation)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index d660b91..f1b7511 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,11 +19,9 @@
 
 package org.apache.samza.util
 
-import org.apache.samza.config.{KafkaConfig, Config, ConfigException}
+import org.apache.samza.config.{Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
 import java.util.concurrent.atomic.AtomicLong
-import kafka.client.ClientUtils
-import org.apache.samza.SamzaException
 
 object KafkaUtil {
   val counter = new AtomicLong(0)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 8109f73..0e1c38e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -104,4 +104,22 @@ class TestKafkaConfig {
     // topic fetch size
     assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
   }
+
+  @Test
+  def testChangeLogProperties() {
+    val props = (new Properties /: Map(
+      "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
+      "stores.test1.changelog" -> "kafka.mychangelog1",
+      "stores.test2.changelog" -> "kafka.mychangelog2",
+      "stores.test1.changelog.kafka.cleanup.policy" -> "delete"
+      )) { case (props, (k, v)) => props.put(k, v); props }
+
+    val mapConfig = new MapConfig(props.toMap[String, String])
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
+    val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores()
+    assertEquals(storeToChangelog.get("test1").getOrElse(""), "mychangelog1")
+    assertEquals(storeToChangelog.get("test2").getOrElse(""), "mychangelog2")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 5ceb109..c759a7b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -164,6 +164,10 @@ object TestKafkaSystemAdmin {
 class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
+  val systemName = "test"
+  // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
+  val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+
   def testShouldAssembleMetadata {
     val oldestOffsets = Map(
       new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1",
@@ -205,8 +209,6 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testShouldGetOldestNewestAndNextOffsets {
-    val systemName = "test"
-    val systemAdmin = new KafkaSystemAdmin(systemName, brokers)
 
     // Create an empty topic with 50 partitions, but with no offsets.
     createTopic
@@ -271,7 +273,7 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testNonExistentTopic {
-    val systemAdmin = new KafkaSystemAdmin("test", brokers)
+
     val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
@@ -280,7 +282,6 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testOffsetsAfter {
-    val systemAdmin = new KafkaSystemAdmin("test", brokers)
     val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
     val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
     val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
@@ -290,7 +291,7 @@ class TestKafkaSystemAdmin {
     assertEquals("3", offsetsAfter(ssp2))
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
     import kafka.api.{ TopicMetadata, TopicMetadataResponse }
 
     // Simulate Kafka telling us that the leader for the topic is not available

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index fa1d51b..c0a20af 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
@@ -53,7 +54,12 @@ public class MockSystemAdmin implements SystemAdmin {
     return metadata;
   }
 
-  @Override
+    @Override
+    public void createChangelogStream(String streamName, int numOfPartitions) {
+        throw new SamzaException("Method not implemented");
+    }
+
+    @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 118f5ee..ca25258 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,6 +22,7 @@ package org.apache.samza.test.integration
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
+import java.util.logging.{Level, LogManager, Handler, Logger}
 
 import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
@@ -69,7 +70,8 @@ import scala.collection.mutable.SynchronizedMap
 
 object TestStatefulTask {
   val INPUT_TOPIC = "input"
-  val STATE_TOPIC = "mystore"
+  val STORE_NAME = "mystore"
+  val STATE_TOPIC_STREAM = "mystoreChangelog"
   val TOTAL_TASK_NAMES = 1
   val REPLICATION_FACTOR = 3
 
@@ -87,6 +89,9 @@ object TestStatefulTask {
   val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
   val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  props1.setProperty("auto.create.topics.enable","false")
+  props2.setProperty("auto.create.topics.enable","false")
+  props3.setProperty("auto.create.topics.enable","false")
 
   val config = new java.util.Properties()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
@@ -124,16 +129,10 @@ object TestStatefulTask {
       INPUT_TOPIC,
       TOTAL_TASK_NAMES,
       REPLICATION_FACTOR)
-
-    AdminUtils.createTopic(
-      zkClient,
-      STATE_TOPIC,
-      TOTAL_TASK_NAMES,
-      REPLICATION_FACTOR)
   }
 
   def validateTopics {
-    val topics = Set(STATE_TOPIC, INPUT_TOPIC)
+    val topics = Set(INPUT_TOPIC)
     var done = false
     var retries = 0
 
@@ -207,7 +206,8 @@ class TestStatefulTask {
     "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
-    "stores.mystore.changelog" -> "kafka.mystore",
+    "stores.mystore.changelog" -> "kafka.mystoreChangelog",
+    "stores.mystore.changelog.replication.factor" -> "1",
 
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
     // Always start consuming at offset 0. This avoids a race condition between
@@ -249,7 +249,7 @@ class TestStatefulTask {
     send(task, "-99")
 
     // Validate that messages appear in store stream.
-    val messages = readAll(STATE_TOPIC, 5, "testShouldStartTaskForFirstTime")
+    val messages = readAll(STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
 
     assertEquals(6, messages.length)
     assertEquals("1", messages(0))
@@ -291,7 +291,7 @@ class TestStatefulTask {
     send(task, "5")
 
     // Validate that messages appear in store stream.
-    val messages = readAll(STATE_TOPIC, 14, "testShouldRestoreStore")
+    val messages = readAll(STATE_TOPIC_STREAM, 14, "testShouldRestoreStore")
 
     assertEquals(15, messages.length)
     // From initial start.
@@ -424,7 +424,7 @@ class TestTask extends StreamTask with InitableTask {
   def init(config: Config, context: TaskContext) {
     TestTask.register(context.getTaskName, this)
     store = context
-      .getStore(TestStatefulTask.STATE_TOPIC)
+      .getStore(TestStatefulTask.STORE_NAME)
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     restored ++= iter

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index cab5101..58f2464 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -421,4 +421,6 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
       streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
     }).toMap[String, SystemStreamMetadata]
   }
+
+  override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
 }