You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/26 18:23:34 UTC

[samza] branch master updated: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f319db5  SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955)
f319db5 is described below

commit f319db519168cf06de14232de99ef0cf38fdb9f7
Author: cameronlee314 <37...@users.noreply.github.com>
AuthorDate: Tue Mar 26 11:23:29 2019 -0700

    SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955)
    
    * moving FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to java
    
    * renaming JavaSystemConfig to SystemConfig now that the scala version is gone
---
 .../config/FileSystemCheckpointManagerConfig.java} |  22 +-
 .../{JavaSystemConfig.java => SystemConfig.java}   |  97 ++++++--
 .../samza/storage/ChangelogStreamManager.java      |   4 +-
 .../org/apache/samza/storage/StorageRecovery.java  |   4 +-
 .../java/org/apache/samza/system/SystemAdmins.java |   4 +-
 .../apache/samza/checkpoint/OffsetManager.scala    |   4 +-
 .../file/FileSystemCheckpointManager.scala         |   9 +-
 .../org/apache/samza/config/StreamConfig.scala     |   2 +-
 .../org/apache/samza/config/SystemConfig.scala     |  68 -----
 .../apache/samza/container/SamzaContainer.scala    |  17 +-
 .../apache/samza/coordinator/JobModelManager.scala |   8 -
 .../reporter/MetricsSnapshotReporterFactory.scala  |  10 +-
 .../apache/samza/util/CoordinatorStreamUtil.scala  |  12 +-
 .../TestFileSystemCheckpointManagerConfig.java     |  43 ++++
 .../apache/samza/config/TestJavaSystemConfig.java  |  68 -----
 .../org/apache/samza/config/TestSystemConfig.java  | 273 +++++++++++++++++++++
 .../samza/checkpoint/TestCheckpointTool.scala      |   8 +-
 .../org/apache/samza/config/TestSystemConfig.scala |  67 -----
 .../samza/coordinator/TestJobCoordinator.scala     |  16 +-
 .../TestRangeSystemStreamPartitionMatcher.scala    |   4 +-
 .../TestRegexSystemStreamPartitionMatcher.scala    |   4 +-
 .../apache/samza/config/KafkaConsumerConfig.java   |   6 +-
 .../samza/system/kafka/KafkaSystemAdmin.java       |   2 +-
 .../kafka/KafkaCheckpointManagerFactory.scala      |   8 +-
 .../org/apache/samza/config/KafkaConfig.scala      |  17 +-
 .../samza/config_deprecated/KafkaConfig.scala      |  14 +-
 .../kafka_deprecated/KafkaSystemFactory.scala      |   4 +-
 .../kafka/TestKafkaCheckpointManager.scala         |   9 +-
 .../org/apache/samza/config/Log4jSystemConfig.java |   2 +-
 .../apache/samza/logging/log4j/StreamAppender.java |  11 +-
 .../org/apache/samza/config/Log4jSystemConfig.java |   2 +-
 .../samza/logging/log4j2/StreamAppender.java       |  11 +-
 .../descriptors/InMemorySystemDescriptor.java      |   4 +-
 .../benchmark/SystemConsumerWithSamzaBench.java    |   2 +-
 34 files changed, 493 insertions(+), 343 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
similarity index 60%
rename from samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
rename to samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
index 707ea59..e131895 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
+++ b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
@@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.samza.config;
 
-package org.apache.samza.config
+import java.util.Optional;
 
-object FileSystemCheckpointManagerConfig {
-  // file system checkpoint manager config constants
-  val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints
 
-  implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config)
-}
+public class FileSystemCheckpointManagerConfig extends MapConfig {
+  /**
+   * Path on local file system where checkpoints should be stored.
+   */
+  private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path";
+
+  public FileSystemCheckpointManagerConfig(Config config) {
+    super(config);
+  }
 
-class FileSystemCheckpointManagerConfig(config: Config) extends ScalaMapConfig(config) {
-  def getFileSystemCheckpointRoot = getOption(FileSystemCheckpointManagerConfig.CHECKPOINT_MANAGER_ROOT)
+  public Optional<String> getFileSystemCheckpointRoot() {
+    return Optional.ofNullable(get(CHECKPOINT_MANAGER_ROOT));
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
similarity index 55%
rename from samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
rename to samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index fde98c6..93a0c32 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -22,38 +22,49 @@ package org.apache.samza.config;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.util.Util;
 
-
 /**
- * a java version of the system config
+ * Config helper methods related to systems.
  */
-public class JavaSystemConfig extends MapConfig {
-  public static final String SYSTEM_PREFIX = "systems.";
-  public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
-  public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
-  private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream.";
+public class SystemConfig extends MapConfig {
+  private static final String SYSTEMS_PREFIX = "systems.";
+  public static final String SYSTEM_ID_PREFIX = SYSTEMS_PREFIX + "%s.";
+
+  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+  public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
+  @VisibleForTesting
+  static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX + "default.stream.";
+
+  // If true, automatically delete committed messages from streams whose committed messages can be deleted.
+  // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+  // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
+  @VisibleForTesting
+  static final String DELETE_COMMITTED_MESSAGES = SYSTEM_ID_PREFIX + "samza.delete.committed.messages";
+
   private static final String EMPTY = "";
 
-  public static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
-  public static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
+  static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
+  static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
 
-  public JavaSystemConfig(Config config) {
+  public SystemConfig(Config config) {
     super(config);
   }
 
-  public String getSystemFactory(String name) {
-    if (name == null) {
-      return null;
+  public Optional<String> getSystemFactory(String systemName) {
+    if (systemName == null) {
+      return Optional.empty();
     }
-    String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
+    String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, systemName);
     String value = get(systemFactory, null);
-    return (StringUtils.isBlank(value)) ? null : value;
+    return (StringUtils.isBlank(value)) ? Optional.empty() : Optional.of(value);
   }
 
   /**
@@ -62,8 +73,8 @@ public class JavaSystemConfig extends MapConfig {
    * @return A list system names
    */
   public List<String> getSystemNames() {
-    Config subConf = subset(SYSTEM_PREFIX, true);
-    ArrayList<String> systemNames = new ArrayList<String>();
+    Config subConf = subset(SYSTEMS_PREFIX, true);
+    ArrayList<String> systemNames = new ArrayList<>();
     for (Map.Entry<String, String> entry : subConf.entrySet()) {
       String key = entry.getKey();
       if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
@@ -81,7 +92,7 @@ public class JavaSystemConfig extends MapConfig {
   public Map<String, SystemAdmin> getSystemAdmins() {
     return getSystemFactories().entrySet()
         .stream()
-        .collect(Collectors.toMap(systemNameToFactoryEntry -> systemNameToFactoryEntry.getKey(),
+        .collect(Collectors.toMap(Entry::getKey,
             systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
                 .getAdmin(systemNameToFactoryEntry.getKey(), this)));
   }
@@ -105,11 +116,8 @@ public class JavaSystemConfig extends MapConfig {
     Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
       systemName -> systemName,
       systemName -> {
-        String systemFactoryClassName = getSystemFactory(systemName);
-        if (systemFactoryClassName == null) {
-          throw new SamzaException(
-              String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-        }
+        String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException(
+            String.format("A stream uses system %s, which is missing from the configuration.", systemName)));
         return Util.getObj(systemFactoryClassName, SystemFactory.class);
       }));
 
@@ -147,4 +155,47 @@ public class JavaSystemConfig extends MapConfig {
 
     return systemOffsetDefault;
   }
+
+  /**
+   * @param systemName name of the system
+   * @return the key serde for the {@code systemName}, or empty if it was not found
+   */
+  public Optional<String> getSystemKeySerde(String systemName) {
+    return getSystemDefaultStreamProperty(systemName, StreamConfig.KEY_SERDE());
+  }
+
+  /**
+   * @param systemName name of the system
+   * @return the message serde for the {@code systemName}, or empty if it was not found
+   */
+  public Optional<String> getSystemMsgSerde(String systemName) {
+    return getSystemDefaultStreamProperty(systemName, StreamConfig.MSG_SERDE());
+  }
+
+  /**
+   * @param systemName name of the system
+   * @return if messages committed to this system should automatically be deleted
+   */
+  public boolean deleteCommittedMessages(String systemName) {
+    return getBoolean(String.format(DELETE_COMMITTED_MESSAGES, systemName), false);
+  }
+
+  /**
+   * Gets the system-wide default for the {@code propertyName} for the {@code systemName}.
+   * This will check in a couple of different config locations for the value.
+   */
+  private Optional<String> getSystemDefaultStreamProperty(String systemName, String propertyName) {
+    Map<String, String> defaultStreamProperties = getDefaultStreamProperties(systemName);
+    String defaultStreamProperty = defaultStreamProperties.get(propertyName);
+    if (StringUtils.isNotEmpty(defaultStreamProperty)) {
+      return Optional.of(defaultStreamProperty);
+    } else {
+      String fallbackStreamProperty = get(String.format(SYSTEM_ID_PREFIX, systemName) + propertyName);
+      if (StringUtils.isNotEmpty(fallbackStreamProperty)) {
+        return Optional.of(fallbackStreamProperty);
+      } else {
+        return Optional.empty();
+      }
+    }
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index ea55fe5..71635aa 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
@@ -116,7 +116,7 @@ public class ChangelogStreamManager {
             name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
 
     // Get SystemAdmin for changelog store's system and attempt to create the stream
-    JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+    SystemConfig systemConfig = new SystemConfig(config);
     storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
         // Load system admin for this system.
         SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 4d01159..79491d3 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.context.ContainerContext;
@@ -207,7 +207,7 @@ public class StorageRecovery extends CommandLine {
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
     // don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
 
-    Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories();
+    Map<String, SystemFactory> systemFactories = new SystemConfig(jobConfig).getSystemFactories();
 
     for (ContainerModel containerModel : containers.values()) {
       ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
index 242ac67..be15869 100644
--- a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
+++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.MapConfig;
 
 
@@ -34,7 +34,7 @@ public class SystemAdmins {
   private final Map<String, SystemAdmin> systemAdminMap;
 
   public SystemAdmins(Config config) {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+    SystemConfig systemConfig = new SystemConfig(config);
     this.systemAdminMap = systemConfig.getSystemAdmins();
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index f5cc6fd..09f778a 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.samza.SamzaException
 import org.apache.samza.annotation.InterfaceStability
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.{Config, JavaSystemConfig}
+import org.apache.samza.config.{Config, SystemConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.startpoint.{Startpoint, StartpointManager}
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
@@ -84,7 +84,7 @@ object OffsetManager extends Logging {
         case (systemStream, systemStreamMetadata) =>
           // Get default offset.
           val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
-          val systemDefaultOffset = new JavaSystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
+          val systemDefaultOffset = new SystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
           val defaultOffsetType = if (streamDefaultOffset.isDefined) {
             OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
           } else if (systemDefaultOffset != null) {
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
index edd0ace..68afda1 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -22,17 +22,16 @@ package org.apache.samza.checkpoint.file
 import java.io.File
 import java.io.FileNotFoundException
 import java.io.FileOutputStream
-import java.util
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.checkpoint.CheckpointManager
 import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
+import org.apache.samza.config.{Config, FileSystemCheckpointManagerConfig}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.container.TaskName
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import scala.io.Source
 
 class FileSystemCheckpointManager(
@@ -79,8 +78,8 @@ class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
     val name = config
       .getName
       .getOrElse(throw new SamzaException("Missing job name in configs"))
-    val root = config
-      .getFileSystemCheckpointRoot
+    val fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config)
+    val root = JavaOptionals.toRichOptional(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot).toOption
       .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
     new FileSystemCheckpointManager(name, new File(root))
   }
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 252e38f..5890c07 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -273,7 +273,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     if (systemName == null) {
       Map()
     }
-    val systemConfig = new JavaSystemConfig(config)
+    val systemConfig = new SystemConfig(config)
     val defaults = systemConfig.getDefaultStreamProperties(systemName)
     val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
     new MapConfig(defaults, explicitConfigs)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
deleted file mode 100644
index fd508c2..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-
-/**
-  * Note: All new methods are being added to [[org.apache.samza.config.JavaSystemConfig]]
-  */
-object SystemConfig {
-  // system config constants
-  val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s."
-  val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
-  val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
-
-  // If true, automatically delete committed messages from streams whose committed messages can be deleted.
-  // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
-  // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
-  val DELETE_COMMITTED_MESSAGES = SYSTEM_PREFIX + "samza.delete.committed.messages"
-
-  implicit def Config2System(config: Config) = new SystemConfig(config)
-}
-
-class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  val javaSystemConfig = new JavaSystemConfig(config)
-
-  def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name))
-
-  def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
-
-  def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE)
-
-  def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false)
-
-  /**
-   * Returns a list of all system names from the config file. Useful for
-   * getting individual systems.
-   */
-  def getSystemNames() = javaSystemConfig.getSystemNames().asScala
-
-  private def getSystemDefaultStreamProperty(name: String, property: String) = {
-    val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name)
-    val streamDefault = defaultStreamProperties.get(property)
-    if (!(streamDefault == null || streamDefault.isEmpty)) {
-      Option(streamDefault)
-    } else {
-      getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name)
-    }
-  }
-}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 70ff87d..d2b8f8f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -37,7 +37,6 @@ import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
@@ -50,13 +49,12 @@ import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, Metr
 import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.startpoint.StartpointManager
-import org.apache.samza.storage.StorageEngineFactory.StoreMode
 import org.apache.samza.storage._
 import org.apache.samza.system._
 import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
 import org.apache.samza.table.TableManager
-import org.apache.samza.table.utils.SerdeUtils
 import org.apache.samza.task._
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{Util, _}
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
@@ -139,6 +137,7 @@ object SamzaContainer extends Logging {
     externalContextOption: Option[ExternalContext],
     localityManager: LocalityManager = null) = {
     val config = jobContext.getConfig
+    val systemConfig = new SystemConfig(config)
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
@@ -192,8 +191,7 @@ object SamzaContainer extends Logging {
       .map(_.getSystem)
       .toSet
 
-
-    val systemNames = config.getSystemNames
+    val systemNames = systemConfig.getSystemNames.asScala
 
     info("Got system names: %s" format systemNames)
 
@@ -202,8 +200,7 @@ object SamzaContainer extends Logging {
     info("Got serde streams: %s" format serdeStreams)
 
     val systemFactories = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
+      val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
       (systemName, Util.getObj(systemFactoryClassName, classOf[SystemFactory]))
     }).toMap
@@ -321,11 +318,13 @@ object SamzaContainer extends Logging {
         }).toMap
     }
 
-    val systemKeySerdes = buildSystemSerdeMap(systemName => config.getSystemKeySerde(systemName))
+    val systemKeySerdes = buildSystemSerdeMap(systemName =>
+      JavaOptionals.toRichOptional(systemConfig.getSystemKeySerde(systemName)).toOption)
 
     debug("Got system key serdes: %s" format systemKeySerdes)
 
-    val systemMessageSerdes = buildSystemSerdeMap(systemName => config.getSystemMsgSerde(systemName))
+    val systemMessageSerdes = buildSystemSerdeMap(systemName =>
+      JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption)
 
     debug("Got system message serdes: %s" format systemMessageSerdes)
 
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index c59e35e..fa9d2a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -23,14 +23,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.samza.Partition
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, _}
-import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.Config
 import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory}
@@ -381,8 +375,6 @@ object JobModelManager extends Logging {
     var containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
     new JobModel(config, containerMap.asJava)
   }
-
-  private def getSystemNames(config: Config) = config.getSystemNames().toSet
 }
 
 /**
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 8a9c021..132a903 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -21,10 +21,9 @@ package org.apache.samza.metrics.reporter
 
 import org.apache.samza.util.{Logging, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{ApplicationConfig, Config}
+import org.apache.samza.config.{ApplicationConfig, Config, SystemConfig}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.TaskConfig.Config2Task
@@ -33,6 +32,7 @@ import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
 import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
   def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
@@ -72,8 +72,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
 
     val systemName = systemStream.getSystem
 
-    val systemFactoryClassName = config
-      .getSystemFactory(systemName)
+    val systemConfig = new SystemConfig(config)
+    val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
 
     val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
@@ -87,7 +87,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     info("Got producer %s." format producer)
 
     val streamSerdeName = config.getStreamMsgSerde(systemStream)
-    val systemSerdeName = config.getSystemMsgSerde(systemName)
+    val systemSerdeName = JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption
     val serdeName = streamSerdeName.getOrElse(systemSerdeName.getOrElse(null))
     val serde = if (serdeName != null) {
       config.getSerdeClass(serdeName) match {
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bfb2271..0d767c6 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,10 +22,10 @@
 package org.apache.samza.util
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigException, JobConfig, MapConfig, SystemConfig}
+import org.apache.samza.config._
 import org.apache.samza.system.{SystemFactory, SystemStream}
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 import scala.collection.immutable.Map
 import scala.collection.JavaConverters._
@@ -38,7 +38,7 @@ object CoordinatorStreamUtil {
   def buildCoordinatorStreamConfig(config: Config) = {
     val (jobName, jobId) = getJobNameAndId(config)
     // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
-    val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
+    val map = config.subset(SystemConfig.SYSTEM_ID_PREFIX format config.getCoordinatorSystemName, false).asScala ++
       Map[String, String](
         JobConfig.JOB_NAME -> jobName,
         JobConfig.JOB_ID -> jobId,
@@ -66,9 +66,9 @@ object CoordinatorStreamUtil {
     */
   def getCoordinatorSystemFactory(config: Config) = {
     val systemName = config.getCoordinatorSystemName
-    val systemFactoryClassName = config
-      .getSystemFactory(systemName)
-      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    val systemConfig = new SystemConfig(config)
+    val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName))
     Util.getObj(systemFactoryClassName, classOf[SystemFactory])
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java
new file mode 100644
index 0000000..35ab060
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestFileSystemCheckpointManagerConfig {
+  @Test
+  public void testGetFileSystemCheckpointRoot() {
+    String checkpointManagerRoot = "checkpointManagerRoot";
+
+    // checkpoint path exists
+    Config config = new MapConfig(ImmutableMap.of("task.checkpoint.path", checkpointManagerRoot));
+    FileSystemCheckpointManagerConfig fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config);
+    assertEquals(checkpointManagerRoot, fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().get());
+
+    // checkpoint path does not exist
+    config = new MapConfig();
+    fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config);
+    assertFalse(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().isPresent());
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
deleted file mode 100644
index 94ba374..0000000
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-public class TestJavaSystemConfig {
-  private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
-  private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
-  private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
-  private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
-  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
-  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
-
-  @Test
-  public void testClassName() {
-    Map<String, String> map = new HashMap<String, String>();
-    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
-    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
-    assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
-  }
-
-  @Test
-  public void testGetEmptyClassNameAsNull() {
-    Map<String, String> map = new HashMap<String, String>();
-    map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
-    map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
-    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
-    assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
-    assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2));
-  }
-
-  @Test
-  public void testGetSystemNames() {
-    Map<String, String> map = new HashMap<String, String>();
-    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
-    map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
-    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
-    assertEquals(2, systemConfig.getSystemNames().size());
-    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
-    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
-  }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java
new file mode 100644
index 0000000..5512bbc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestSystemConfig {
+  private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
+  private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
+  private static final String MOCK_SYSTEM_FACTORY_NAME1 =
+      String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
+  private static final String MOCK_SYSTEM_FACTORY_NAME2 =
+      String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
+  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
+  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
+
+  private static final String SAMZA_OFFSET_DEFAULT = "samza.offset.default";
+
+  /**
+   * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
+   * any methods of this mock.
+   */
+  private static final SystemAdmin SYSTEM_ADMIN1 = mock(SystemAdmin.class);
+  /**
+   * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
+   * any methods of this mock.
+   */
+  private static final SystemAdmin SYSTEM_ADMIN2 = mock(SystemAdmin.class);
+
+  @Test
+  public void testGetSystemFactory() {
+    Map<String, String> map = new HashMap<>();
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+    assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).get());
+  }
+
+  @Test
+  public void testGetSystemFactoryEmptyClassName() {
+    Map<String, String> map = new HashMap<>();
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
+    map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+    assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).isPresent());
+    assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2).isPresent());
+  }
+
+  @Test
+  public void testGetSystemNames() {
+    Map<String, String> map = new HashMap<>();
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+    map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+    assertEquals(2, systemConfig.getSystemNames().size());
+    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
+    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
+  }
+
+  @Test
+  public void testGetSystemAdmins() {
+    Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+    Map<String, SystemAdmin> expected = ImmutableMap.of(MOCK_SYSTEM_NAME1, SYSTEM_ADMIN1);
+    assertEquals(expected, systemConfig.getSystemAdmins());
+  }
+
+  @Test
+  public void testGetSystemAdmin() {
+    Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+    assertEquals(SYSTEM_ADMIN1, systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME1));
+    assertNull(systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME2));
+  }
+
+  @Test
+  public void testGetSystemFactories() {
+    Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+    SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+    Map<String, SystemFactory> actual = systemConfig.getSystemFactories();
+    assertEquals(actual.size(), 1);
+    assertTrue(actual.get(MOCK_SYSTEM_NAME1) instanceof MockSystemFactory);
+  }
+
+  @Test
+  public void testGetDefaultStreamProperties() {
+    String defaultStreamPrefix = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+    String system1ConfigKey = "config1-key";
+    String system1ConfigValue = "config1-value";
+    String system2ConfigKey = "config2-key";
+    String system2ConfigValue = "config2-value";
+
+    Config config = new MapConfig(ImmutableMap.of(defaultStreamPrefix + system1ConfigKey, system1ConfigValue,
+        defaultStreamPrefix + system2ConfigKey, system2ConfigValue));
+    Config expected =
+        new MapConfig(ImmutableMap.of(system1ConfigKey, system1ConfigValue, system2ConfigKey, system2ConfigValue));
+    SystemConfig systemConfig = new SystemConfig(config);
+    assertEquals(expected, systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME1));
+    assertEquals(new MapConfig(), systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME2));
+  }
+
+  @Test
+  public void testGetSystemOffsetDefault() {
+    String system1OffsetDefault = "offset-system-1";
+    String system2OffsetDefault = "offset-system-2";
+    Config config = new MapConfig(ImmutableMap.of(
+        // default.stream.samza.offset.default set
+        String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT,
+        system1OffsetDefault,
+        // should not use this value since default.stream.samza.offset.default is set
+        String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT, "wrong-value",
+        // only samza.offset.default set
+        String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME2) + SAMZA_OFFSET_DEFAULT, system2OffsetDefault));
+    SystemConfig systemConfig = new SystemConfig(config);
+    assertEquals(system1OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME1));
+    assertEquals(system2OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME2));
+    assertEquals(SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING, systemConfig.getSystemOffsetDefault("other-system"));
+  }
+
+  @Test
+  public void testGetSystemKeySerde() {
+    String system1KeySerde = "system1-key-serde";
+    String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+
+    // value specified explicitly
+    Config config =
+        new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), system1KeySerde));
+    SystemConfig systemConfig = new SystemConfig(config);
+    assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is unspecified, try fall back config key
+    config = new MapConfig(
+        ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(),
+            system1KeySerde));
+    systemConfig = new SystemConfig(config);
+    assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is empty string, try fall back config key
+    config = new MapConfig(ImmutableMap.of(
+        // default stream property is empty
+        defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), "",
+        // fall back entry
+        String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(), system1KeySerde));
+    systemConfig = new SystemConfig(config);
+    assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is unspecified, fall back is also empty
+    config = new MapConfig(
+        ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(),
+            ""));
+    systemConfig = new SystemConfig(config);
+    assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
+
+    // default stream property is unspecified, fall back is also unspecified
+    config = new MapConfig();
+    systemConfig = new SystemConfig(config);
+    assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
+  }
+
+  @Test
+  public void testGetSystemMsgSerde() {
+    String system1MsgSerde = "system1-msg-serde";
+    String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+
+    // value specified explicitly
+    Config config =
+        new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), system1MsgSerde));
+    SystemConfig systemConfig = new SystemConfig(config);
+    assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is unspecified, try fall back config msg
+    config = new MapConfig(
+        ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(),
+            system1MsgSerde));
+    systemConfig = new SystemConfig(config);
+    assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is empty string, try fall back config msg
+    config = new MapConfig(ImmutableMap.of(
+        // default stream property is empty
+        defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), "",
+        // fall back entry
+        String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(), system1MsgSerde));
+    systemConfig = new SystemConfig(config);
+    assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+    // default stream property is unspecified, fall back is also empty
+    config = new MapConfig(
+        ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(),
+            ""));
+    systemConfig = new SystemConfig(config);
+    assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
+
+    // default stream property is unspecified, fall back is also unspecified
+    config = new MapConfig();
+    systemConfig = new SystemConfig(config);
+    assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
+  }
+
+  @Test
+  public void testDeleteCommittedMessages() {
+    Config config = new MapConfig(ImmutableMap.of(
+        // value is "true"
+        String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME1), "true",
+        // value is explicitly "false"
+        String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME2), "false"));
+    SystemConfig systemConfig = new SystemConfig(config);
+    assertTrue(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME1));
+    assertFalse(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME2));
+    assertFalse(systemConfig.deleteCommittedMessages("other-system")); // value is not specified
+  }
+
+  public static class MockSystemFactory implements SystemFactory {
+    @Override
+    public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+      throw new UnsupportedOperationException("Unnecessary for test");
+    }
+
+    @Override
+    public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+      throw new UnsupportedOperationException("Unnecessary for test");
+    }
+
+    @Override
+    public SystemAdmin getAdmin(String systemName, Config config) {
+      switch (systemName) {
+        case MOCK_SYSTEM_NAME1:
+          return SYSTEM_ADMIN1;
+        case MOCK_SYSTEM_NAME2:
+          return SYSTEM_ADMIN2;
+        default:
+          throw new UnsupportedOperationException("System name unsupported: " + systemName);
+      }
+    }
+  }
+
+  private static String buildDefaultStreamPropertiesPrefix(String systemName) {
+    return String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName);
+  }
+}
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 244fd8f..93c7096 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -84,8 +84,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ).asJava)
     config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
@@ -151,8 +151,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ).asJava)
     val generatedConfigs: MapConfig = JobPlanner.generateSingleJobConfig(userDefinedConfig)
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
deleted file mode 100644
index cc54d00..0000000
--- a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import scala.collection.JavaConverters._
-import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY}
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSystemConfig {
-  val MOCK_SYSTEM_NAME1 = "mocksystem1"
-  val MOCK_SYSTEM_NAME2 = "mocksystem2"
-  val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1)
-  val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2)
-  val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"
-  val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"
-
-  def testClassName {
-    val configMap = Map[String, String](
-      MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1
-    )
-    val config = new MapConfig(configMap.asJava)
-
-    assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse(""))
-  }
-
-  @Test
-  def testGetEmptyClassNameAsNull {
-    val configMap = Map[String, String](
-      MOCK_SYSTEM_FACTORY_NAME1 -> "",
-      MOCK_SYSTEM_FACTORY_NAME1 -> " "
-    )
-    val config = new MapConfig(configMap.asJava)
-
-    assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None)
-    assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None)
-  }
-
-  def testGetSystemNames {
-    val configMap = Map[String, String](
-      MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1,
-      MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2
-    )
-    val config = new MapConfig(configMap.asJava)
-    val systemNames = config.getSystemNames()
-
-    assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1))
-    assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2))
-  }
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index b7a9bec..5b43840 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -23,7 +23,7 @@ import java.util
 
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
-import org.apache.samza.config.{JobConfig, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.config._
 import org.apache.samza.container.{SamzaContainer, TaskName}
 import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
 import org.apache.samza.job.MockJobFactory
@@ -91,8 +91,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       JobConfig.JOB_CONTAINER_COUNT -> "2",
       TaskConfig.INPUT_STREAMS -> "test.stream1",
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory",
       JobConfig.MONITOR_PARTITION_CHANGE -> "true",
       JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> "100"
@@ -153,8 +153,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       JobConfig.JOB_CONTAINER_COUNT -> "2",
       TaskConfig.INPUT_STREAMS -> "test.stream1",
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
       )
 
@@ -230,7 +230,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     */
   @Test
   def testWithPartitionAssignmentWithMockJobFactory {
-    val config = new SystemConfig(getTestConfig(classOf[MockJobFactory]))
+    val config = getTestConfig(classOf[MockJobFactory])
     val systemStream = new SystemStream("test", "stream1")
     val streamMetadataCache = mock(classOf[StreamMetadataCache])
     when(streamMetadataCache.getStreamMetadata(Set(systemStream), true)).thenReturn(
@@ -256,8 +256,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       JobConfig.STREAM_JOB_FACTORY_CLASS -> clazz.getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
       JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]",
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava)
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava)
     config
   }
 
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
index 3d385d6..e9e372d 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
@@ -52,7 +52,7 @@ class TestRangeSystemStreamPartitionMatcher {
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
       JobConfig.SSP_MATCHER_CONFIG_RANGES -> range,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+      (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
   }
 
   @Test
@@ -99,7 +99,7 @@ class TestRangeSystemStreamPartitionMatcher {
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+      (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
 
     new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
   }
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
index 255c85e..cb3b1e0 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
@@ -52,7 +52,7 @@ class TestRegexSystemStreamPartitionMatcher {
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
       JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+      (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
   }
 
   @Test
@@ -70,7 +70,7 @@ class TestRegexSystemStreamPartitionMatcher {
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+      (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
 
     new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
   }
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index dea60b3..0b59871 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -82,7 +82,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
     // check if samza default offset value is defined
-    String systemOffsetDefault = new JavaSystemConfig(config).getSystemOffsetDefault(systemName);
+    String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName);
 
     // Translate samza config value to kafka config value
     String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
@@ -214,10 +214,10 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     String newAutoOffsetReset = KAFKA_OFFSET_LATEST;
     if (!StringUtils.isBlank(samzaOffsetDefault)) {
       switch (samzaOffsetDefault) {
-        case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
+        case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
           newAutoOffsetReset = KAFKA_OFFSET_LATEST;
           break;
-        case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
+        case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
           newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
           break;
         default:
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index f4db408..0ffd1a7 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -48,9 +48,9 @@ import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SystemConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 2999800..efbe780 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -26,6 +26,7 @@ import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.{StreamSpec, SystemFactory}
 import org.apache.samza.system.kafka.KafkaStreamSpec
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{KafkaUtil, Logging, Util, _}
 
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -38,9 +39,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse(
       throw new SamzaException("No system defined for Kafka's checkpoint manager."))
 
-    val checkpointSystemFactoryName = new SystemConfig(config)
-      .getSystemFactory(checkpointSystemName)
-      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
+    val systemConfig = new SystemConfig(config)
+    val checkpointSystemFactoryName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(checkpointSystemName))
+      .toOption
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format checkpointSystemName))
 
     val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory])
     val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 607feb0..10f879c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -23,15 +23,13 @@ package org.apache.samza.config
 import java.util
 import java.util.concurrent.TimeUnit
 import java.util.regex.Pattern
-import java.util.{Properties, UUID}
+import java.util.Properties
 
 import com.google.common.collect.ImmutableMap
-import kafka.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.util.{Logging, StreamUtil}
 
 import scala.collection.JavaConverters._
@@ -68,7 +66,7 @@ object KafkaConfig {
     * Defines how low a queue can get for a single system/stream/partition
     * combination before trying to fetch more messages for it.
     */
-  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold"
 
   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
 
@@ -79,7 +77,7 @@ object KafkaConfig {
     * the bytes limit + size of max message in the partition for a given stream.
     * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
     */
-  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes"
 
   val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
 
@@ -113,7 +111,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   }
 
   private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
-    val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+    val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
     defaultReplicationFactor
   }
 
@@ -127,7 +125,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
     */
   def getCheckpointSegmentBytes() = {
-    val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+    val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
     getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
   }
 
@@ -144,7 +142,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     case Some(rplFactor) => rplFactor
     case _ =>
       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
-      val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+      val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
       systemReplicationFactor
   }
 
@@ -161,7 +159,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     case Some(segBytes) => segBytes
     case _ =>
       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
-      val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+      val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
       segBytes
   }
 
@@ -240,7 +238,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
       storageConfig.getChangelogStream(storeName).foreach(changelogName => {
         val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
-        val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
         storeToChangelog += storeName -> systemStream.getStream
       })
     }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
index 02a6275..9b5cb31 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config._
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.util.{Logging, StreamUtil}
 
 import scala.collection.JavaConverters._
@@ -72,7 +71,7 @@ object KafkaConfig {
     * Defines how low a queue can get for a single system/stream/partition
     * combination before trying to fetch more messages for it.
     */
-  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold"
 
   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
 
@@ -83,7 +82,7 @@ object KafkaConfig {
     * the bytes limit + size of max message in the partition for a given stream.
     * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
     */
-  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes"
 
   val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
 
@@ -117,7 +116,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   }
 
   private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
-    val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+    val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
     defaultReplicationFactor
   }
 
@@ -131,7 +130,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
     */
   def getCheckpointSegmentBytes() = {
-    val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+    val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
     getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
   }
 
@@ -148,7 +147,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     case Some(rplFactor) => rplFactor
     case _ =>
       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
-      val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+      val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
       systemReplicationFactor
   }
 
@@ -165,7 +164,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     case Some(segBytes) => segBytes
     case _ =>
       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
-      val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+      val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
       segBytes
   }
 
@@ -244,7 +243,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
       storageConfig.getChangelogStream(storeName).foreach(changelogName => {
         val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
-        val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
         storeToChangelog += storeName -> systemStream.getStream
       })
     }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index 7e8509d..b4003d8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -33,7 +33,6 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemAdmin
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.system.SystemConsumer
 
 object KafkaSystemFactory extends Logging {
@@ -129,7 +128,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       (topicName, changelogInfo)
     }}
 
-    val deleteCommittedMessages = config.deleteCommittedMessages(systemName)
+    val systemConfig = new SystemConfig(config)
+    val deleteCommittedMessages = systemConfig.deleteCommittedMessages(systemName)
     val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
     new KafkaSystemAdmin(
       systemName,
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 0a0aae8..dbe82fc 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -26,9 +26,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.ConfigType
 import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
 import com.google.common.collect.ImmutableMap
-import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
@@ -37,6 +35,7 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system._
 import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
@@ -209,9 +208,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
       throw new SamzaException("No system defined for Kafka's checkpoint manager."))
 
-    val systemFactoryClassName = new SystemConfig(config)
-      .getSystemFactory(systemName)
-      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    val systemConfig = new SystemConfig(config)
+    val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName))
 
     val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 5824489..12c03f6 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream;
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
  */
-public class Log4jSystemConfig extends JavaSystemConfig {
+public class Log4jSystemConfig extends SystemConfig {
 
   private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
   private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index b4a97f7..19e6ea6 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -286,7 +286,6 @@ public class StreamAppender extends AppenderSkeleton {
 
   protected void setupSystem() {
     config = getConfig();
-    SystemFactory systemFactory = null;
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
 
     if (streamName == null) {
@@ -298,12 +297,10 @@ public class StreamAppender extends AppenderSkeleton {
     metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
 
     String systemName = log4jSystemConfig.getSystemName();
-    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
-    if (systemFactoryName != null) {
-      systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
-    } else {
-      throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
-    }
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
+        .orElseThrow(() -> new SamzaException(
+            "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
+    SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
diff --git a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 5824489..12c03f6 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream;
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
  */
-public class Log4jSystemConfig extends JavaSystemConfig {
+public class Log4jSystemConfig extends SystemConfig {
 
   private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
   private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 28f759e..a224762 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -307,7 +307,6 @@ public class StreamAppender extends AbstractAppender {
 
   protected void setupSystem() {
     config = getConfig();
-    SystemFactory systemFactory = null;
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
 
     if (streamName == null) {
@@ -319,12 +318,10 @@ public class StreamAppender extends AbstractAppender {
     metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
 
     String systemName = log4jSystemConfig.getSystemName();
-    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
-    if (systemFactoryName != null) {
-      systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
-    } else {
-      throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
-    }
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
+        .orElseThrow(() -> new SamzaException(
+            "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
+    SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
index a74d0ba..da61f74 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
@@ -28,7 +28,7 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
 
 /**
  * Descriptor for an InMemorySystem.
@@ -87,7 +87,7 @@ public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDes
   public Map<String, String> toConfig() {
     HashMap<String, String> configs = new HashMap<>(super.toConfig());
     configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
-    configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
+    configs.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
     return configs;
   }
 
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index cc8cceb..683ced4 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -32,10 +32,10 @@ import java.util.stream.IntStream;
 import org.apache.commons.cli.ParseException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;