You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/26 18:23:34 UTC
[samza] branch master updated: SAMZA-2131: [Scala cleanup] Convert
FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java
(#955)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f319db5 SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955)
f319db5 is described below
commit f319db519168cf06de14232de99ef0cf38fdb9f7
Author: cameronlee314 <37...@users.noreply.github.com>
AuthorDate: Tue Mar 26 11:23:29 2019 -0700
SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955)
* moving FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to java
* renaming JavaSystemConfig to SystemConfig now that the scala version is gone
---
.../config/FileSystemCheckpointManagerConfig.java} | 22 +-
.../{JavaSystemConfig.java => SystemConfig.java} | 97 ++++++--
.../samza/storage/ChangelogStreamManager.java | 4 +-
.../org/apache/samza/storage/StorageRecovery.java | 4 +-
.../java/org/apache/samza/system/SystemAdmins.java | 4 +-
.../apache/samza/checkpoint/OffsetManager.scala | 4 +-
.../file/FileSystemCheckpointManager.scala | 9 +-
.../org/apache/samza/config/StreamConfig.scala | 2 +-
.../org/apache/samza/config/SystemConfig.scala | 68 -----
.../apache/samza/container/SamzaContainer.scala | 17 +-
.../apache/samza/coordinator/JobModelManager.scala | 8 -
.../reporter/MetricsSnapshotReporterFactory.scala | 10 +-
.../apache/samza/util/CoordinatorStreamUtil.scala | 12 +-
.../TestFileSystemCheckpointManagerConfig.java | 43 ++++
.../apache/samza/config/TestJavaSystemConfig.java | 68 -----
.../org/apache/samza/config/TestSystemConfig.java | 273 +++++++++++++++++++++
.../samza/checkpoint/TestCheckpointTool.scala | 8 +-
.../org/apache/samza/config/TestSystemConfig.scala | 67 -----
.../samza/coordinator/TestJobCoordinator.scala | 16 +-
.../TestRangeSystemStreamPartitionMatcher.scala | 4 +-
.../TestRegexSystemStreamPartitionMatcher.scala | 4 +-
.../apache/samza/config/KafkaConsumerConfig.java | 6 +-
.../samza/system/kafka/KafkaSystemAdmin.java | 2 +-
.../kafka/KafkaCheckpointManagerFactory.scala | 8 +-
.../org/apache/samza/config/KafkaConfig.scala | 17 +-
.../samza/config_deprecated/KafkaConfig.scala | 14 +-
.../kafka_deprecated/KafkaSystemFactory.scala | 4 +-
.../kafka/TestKafkaCheckpointManager.scala | 9 +-
.../org/apache/samza/config/Log4jSystemConfig.java | 2 +-
.../apache/samza/logging/log4j/StreamAppender.java | 11 +-
.../org/apache/samza/config/Log4jSystemConfig.java | 2 +-
.../samza/logging/log4j2/StreamAppender.java | 11 +-
.../descriptors/InMemorySystemDescriptor.java | 4 +-
.../benchmark/SystemConsumerWithSamzaBench.java | 2 +-
34 files changed, 493 insertions(+), 343 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
similarity index 60%
rename from samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
rename to samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
index 707ea59..e131895 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala
+++ b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
@@ -16,16 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.samza.config;
-package org.apache.samza.config
+import java.util.Optional;
-object FileSystemCheckpointManagerConfig {
- // file system checkpoint manager config constants
- val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints
- implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config)
-}
+public class FileSystemCheckpointManagerConfig extends MapConfig {
+ /**
+ * Path on local file system where checkpoints should be stored.
+ */
+ private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path";
+
+ public FileSystemCheckpointManagerConfig(Config config) {
+ super(config);
+ }
-class FileSystemCheckpointManagerConfig(config: Config) extends ScalaMapConfig(config) {
- def getFileSystemCheckpointRoot = getOption(FileSystemCheckpointManagerConfig.CHECKPOINT_MANAGER_ROOT)
+ public Optional<String> getFileSystemCheckpointRoot() {
+ return Optional.ofNullable(get(CHECKPOINT_MANAGER_ROOT));
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
similarity index 55%
rename from samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
rename to samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index fde98c6..93a0c32 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -22,38 +22,49 @@ package org.apache.samza.config;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.util.Util;
-
/**
- * a java version of the system config
+ * Config helper methods related to systems.
*/
-public class JavaSystemConfig extends MapConfig {
- public static final String SYSTEM_PREFIX = "systems.";
- public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
- public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
- private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream.";
+public class SystemConfig extends MapConfig {
+ private static final String SYSTEMS_PREFIX = "systems.";
+ public static final String SYSTEM_ID_PREFIX = SYSTEMS_PREFIX + "%s.";
+
+ private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+ public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
+ @VisibleForTesting
+ static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX + "default.stream.";
+
+ // If true, automatically delete committed messages from streams whose committed messages can be deleted.
+ // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+ // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
+ @VisibleForTesting
+ static final String DELETE_COMMITTED_MESSAGES = SYSTEM_ID_PREFIX + "samza.delete.committed.messages";
+
private static final String EMPTY = "";
- public static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
- public static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
+ static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
+ static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
- public JavaSystemConfig(Config config) {
+ public SystemConfig(Config config) {
super(config);
}
- public String getSystemFactory(String name) {
- if (name == null) {
- return null;
+ public Optional<String> getSystemFactory(String systemName) {
+ if (systemName == null) {
+ return Optional.empty();
}
- String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
+ String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, systemName);
String value = get(systemFactory, null);
- return (StringUtils.isBlank(value)) ? null : value;
+ return (StringUtils.isBlank(value)) ? Optional.empty() : Optional.of(value);
}
/**
@@ -62,8 +73,8 @@ public class JavaSystemConfig extends MapConfig {
* @return A list system names
*/
public List<String> getSystemNames() {
- Config subConf = subset(SYSTEM_PREFIX, true);
- ArrayList<String> systemNames = new ArrayList<String>();
+ Config subConf = subset(SYSTEMS_PREFIX, true);
+ ArrayList<String> systemNames = new ArrayList<>();
for (Map.Entry<String, String> entry : subConf.entrySet()) {
String key = entry.getKey();
if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
@@ -81,7 +92,7 @@ public class JavaSystemConfig extends MapConfig {
public Map<String, SystemAdmin> getSystemAdmins() {
return getSystemFactories().entrySet()
.stream()
- .collect(Collectors.toMap(systemNameToFactoryEntry -> systemNameToFactoryEntry.getKey(),
+ .collect(Collectors.toMap(Entry::getKey,
systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
.getAdmin(systemNameToFactoryEntry.getKey(), this)));
}
@@ -105,11 +116,8 @@ public class JavaSystemConfig extends MapConfig {
Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
systemName -> systemName,
systemName -> {
- String systemFactoryClassName = getSystemFactory(systemName);
- if (systemFactoryClassName == null) {
- throw new SamzaException(
- String.format("A stream uses system %s, which is missing from the configuration.", systemName));
- }
+ String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException(
+ String.format("A stream uses system %s, which is missing from the configuration.", systemName)));
return Util.getObj(systemFactoryClassName, SystemFactory.class);
}));
@@ -147,4 +155,47 @@ public class JavaSystemConfig extends MapConfig {
return systemOffsetDefault;
}
+
+ /**
+ * @param systemName name of the system
+ * @return the key serde for the {@code systemName}, or empty if it was not found
+ */
+ public Optional<String> getSystemKeySerde(String systemName) {
+ return getSystemDefaultStreamProperty(systemName, StreamConfig.KEY_SERDE());
+ }
+
+ /**
+ * @param systemName name of the system
+ * @return the message serde for the {@code systemName}, or empty if it was not found
+ */
+ public Optional<String> getSystemMsgSerde(String systemName) {
+ return getSystemDefaultStreamProperty(systemName, StreamConfig.MSG_SERDE());
+ }
+
+ /**
+ * @param systemName name of the system
+ * @return if messages committed to this system should automatically be deleted
+ */
+ public boolean deleteCommittedMessages(String systemName) {
+ return getBoolean(String.format(DELETE_COMMITTED_MESSAGES, systemName), false);
+ }
+
+ /**
+ * Gets the system-wide default for the {@code propertyName} for the {@code systemName}.
+ * This will check in a couple of different config locations for the value.
+ */
+ private Optional<String> getSystemDefaultStreamProperty(String systemName, String propertyName) {
+ Map<String, String> defaultStreamProperties = getDefaultStreamProperties(systemName);
+ String defaultStreamProperty = defaultStreamProperties.get(propertyName);
+ if (StringUtils.isNotEmpty(defaultStreamProperty)) {
+ return Optional.of(defaultStreamProperty);
+ } else {
+ String fallbackStreamProperty = get(String.format(SYSTEM_ID_PREFIX, systemName) + propertyName);
+ if (StringUtils.isNotEmpty(fallbackStreamProperty)) {
+ return Optional.of(fallbackStreamProperty);
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index ea55fe5..71635aa 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
@@ -116,7 +116,7 @@ public class ChangelogStreamManager {
name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
// Get SystemAdmin for changelog store's system and attempt to create the stream
- JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+ SystemConfig systemConfig = new SystemConfig(config);
storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
// Load system admin for this system.
SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 4d01159..79491d3 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.context.ContainerContext;
@@ -207,7 +207,7 @@ public class StorageRecovery extends CommandLine {
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
// don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
- Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories();
+ Map<String, SystemFactory> systemFactories = new SystemConfig(jobConfig).getSystemFactories();
for (ContainerModel containerModel : containers.values()) {
ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
index 242ac67..be15869 100644
--- a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
+++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.MapConfig;
@@ -34,7 +34,7 @@ public class SystemAdmins {
private final Map<String, SystemAdmin> systemAdminMap;
public SystemAdmins(Config config) {
- JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+ SystemConfig systemConfig = new SystemConfig(config);
this.systemAdminMap = systemConfig.getSystemAdmins();
}
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index f5cc6fd..09f778a 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.samza.SamzaException
import org.apache.samza.annotation.InterfaceStability
import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.{Config, JavaSystemConfig}
+import org.apache.samza.config.{Config, SystemConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.startpoint.{Startpoint, StartpointManager}
import org.apache.samza.system.SystemStreamMetadata.OffsetType
@@ -84,7 +84,7 @@ object OffsetManager extends Logging {
case (systemStream, systemStreamMetadata) =>
// Get default offset.
val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
- val systemDefaultOffset = new JavaSystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
+ val systemDefaultOffset = new SystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
val defaultOffsetType = if (streamDefaultOffset.isDefined) {
OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
} else if (systemDefaultOffset != null) {
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
index edd0ace..68afda1 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -22,17 +22,16 @@ package org.apache.samza.checkpoint.file
import java.io.File
import java.io.FileNotFoundException
import java.io.FileOutputStream
-import java.util
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.checkpoint.CheckpointManager
import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
+import org.apache.samza.config.{Config, FileSystemCheckpointManagerConfig}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.container.TaskName
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.io.Source
class FileSystemCheckpointManager(
@@ -79,8 +78,8 @@ class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
val name = config
.getName
.getOrElse(throw new SamzaException("Missing job name in configs"))
- val root = config
- .getFileSystemCheckpointRoot
+ val fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config)
+ val root = JavaOptionals.toRichOptional(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot).toOption
.getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
new FileSystemCheckpointManager(name, new File(root))
}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 252e38f..5890c07 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -273,7 +273,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
if (systemName == null) {
Map()
}
- val systemConfig = new JavaSystemConfig(config)
+ val systemConfig = new SystemConfig(config)
val defaults = systemConfig.getDefaultStreamProperties(systemName)
val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
new MapConfig(defaults, explicitConfigs)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
deleted file mode 100644
index fd508c2..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-
-/**
- * Note: All new methods are being added to [[org.apache.samza.config.JavaSystemConfig]]
- */
-object SystemConfig {
- // system config constants
- val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s."
- val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
- val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
-
- // If true, automatically delete committed messages from streams whose committed messages can be deleted.
- // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
- // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
- val DELETE_COMMITTED_MESSAGES = SYSTEM_PREFIX + "samza.delete.committed.messages"
-
- implicit def Config2System(config: Config) = new SystemConfig(config)
-}
-
-class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
- val javaSystemConfig = new JavaSystemConfig(config)
-
- def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name))
-
- def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
-
- def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE)
-
- def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false)
-
- /**
- * Returns a list of all system names from the config file. Useful for
- * getting individual systems.
- */
- def getSystemNames() = javaSystemConfig.getSystemNames().asScala
-
- private def getSystemDefaultStreamProperty(name: String, property: String) = {
- val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name)
- val streamDefault = defaultStreamProperties.get(property)
- if (!(streamDefault == null || streamDefault.isEmpty)) {
- Option(streamDefault)
- } else {
- getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name)
- }
- }
-}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 70ff87d..d2b8f8f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -37,7 +37,6 @@ import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config._
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
@@ -50,13 +49,12 @@ import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, Metr
import org.apache.samza.serializers._
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.startpoint.StartpointManager
-import org.apache.samza.storage.StorageEngineFactory.StoreMode
import org.apache.samza.storage._
import org.apache.samza.system._
import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
import org.apache.samza.table.TableManager
-import org.apache.samza.table.utils.SerdeUtils
import org.apache.samza.task._
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Util, _}
import org.apache.samza.{SamzaContainerStatus, SamzaException}
@@ -139,6 +137,7 @@ object SamzaContainer extends Logging {
externalContextOption: Option[ExternalContext],
localityManager: LocalityManager = null) = {
val config = jobContext.getConfig
+ val systemConfig = new SystemConfig(config)
val containerModel = jobModel.getContainers.get(containerId)
val containerName = "samza-container-%s" format containerId
val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
@@ -192,8 +191,7 @@ object SamzaContainer extends Logging {
.map(_.getSystem)
.toSet
-
- val systemNames = config.getSystemNames
+ val systemNames = systemConfig.getSystemNames.asScala
info("Got system names: %s" format systemNames)
@@ -202,8 +200,7 @@ object SamzaContainer extends Logging {
info("Got serde streams: %s" format serdeStreams)
val systemFactories = systemNames.map(systemName => {
- val systemFactoryClassName = config
- .getSystemFactory(systemName)
+ val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
(systemName, Util.getObj(systemFactoryClassName, classOf[SystemFactory]))
}).toMap
@@ -321,11 +318,13 @@ object SamzaContainer extends Logging {
}).toMap
}
- val systemKeySerdes = buildSystemSerdeMap(systemName => config.getSystemKeySerde(systemName))
+ val systemKeySerdes = buildSystemSerdeMap(systemName =>
+ JavaOptionals.toRichOptional(systemConfig.getSystemKeySerde(systemName)).toOption)
debug("Got system key serdes: %s" format systemKeySerdes)
- val systemMessageSerdes = buildSystemSerdeMap(systemName => config.getSystemMsgSerde(systemName))
+ val systemMessageSerdes = buildSystemSerdeMap(systemName =>
+ JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption)
debug("Got system message serdes: %s" format systemMessageSerdes)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index c59e35e..fa9d2a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -23,14 +23,8 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import org.apache.samza.Partition
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, _}
-import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.Config
import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory}
@@ -381,8 +375,6 @@ object JobModelManager extends Logging {
var containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap
new JobModel(config, containerMap.asJava)
}
-
- private def getSystemNames(config: Config) = config.getSystemNames().toSet
}
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 8a9c021..132a903 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -21,10 +21,9 @@ package org.apache.samza.metrics.reporter
import org.apache.samza.util.{Logging, StreamUtil, Util}
import org.apache.samza.SamzaException
-import org.apache.samza.config.{ApplicationConfig, Config}
+import org.apache.samza.config.{ApplicationConfig, Config, SystemConfig}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SerializerConfig.Config2Serializer
import org.apache.samza.config.TaskConfig.Config2Task
@@ -33,6 +32,7 @@ import org.apache.samza.metrics.MetricsReporterFactory
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
@@ -72,8 +72,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
val systemName = systemStream.getSystem
- val systemFactoryClassName = config
- .getSystemFactory(systemName)
+ val systemConfig = new SystemConfig(config)
+ val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
@@ -87,7 +87,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
info("Got producer %s." format producer)
val streamSerdeName = config.getStreamMsgSerde(systemStream)
- val systemSerdeName = config.getSystemMsgSerde(systemName)
+ val systemSerdeName = JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption
val serdeName = streamSerdeName.getOrElse(systemSerdeName.getOrElse(null))
val serde = if (serdeName != null) {
config.getSerdeClass(serdeName) match {
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bfb2271..0d767c6 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,10 +22,10 @@
package org.apache.samza.util
import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigException, JobConfig, MapConfig, SystemConfig}
+import org.apache.samza.config._
import org.apache.samza.system.{SystemFactory, SystemStream}
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.immutable.Map
import scala.collection.JavaConverters._
@@ -38,7 +38,7 @@ object CoordinatorStreamUtil {
def buildCoordinatorStreamConfig(config: Config) = {
val (jobName, jobId) = getJobNameAndId(config)
// Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
- val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
+ val map = config.subset(SystemConfig.SYSTEM_ID_PREFIX format config.getCoordinatorSystemName, false).asScala ++
Map[String, String](
JobConfig.JOB_NAME -> jobName,
JobConfig.JOB_ID -> jobId,
@@ -66,9 +66,9 @@ object CoordinatorStreamUtil {
*/
def getCoordinatorSystemFactory(config: Config) = {
val systemName = config.getCoordinatorSystemName
- val systemFactoryClassName = config
- .getSystemFactory(systemName)
- .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+ val systemConfig = new SystemConfig(config)
+ val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName))
Util.getObj(systemFactoryClassName, classOf[SystemFactory])
}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java
new file mode 100644
index 0000000..35ab060
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestFileSystemCheckpointManagerConfig {
+ @Test
+ public void testGetFileSystemCheckpointRoot() {
+ String checkpointManagerRoot = "checkpointManagerRoot";
+
+ // checkpoint path exists
+ Config config = new MapConfig(ImmutableMap.of("task.checkpoint.path", checkpointManagerRoot));
+ FileSystemCheckpointManagerConfig fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config);
+ assertEquals(checkpointManagerRoot, fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().get());
+
+ // checkpoint path does not exist
+ config = new MapConfig();
+ fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config);
+ assertFalse(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().isPresent());
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
deleted file mode 100644
index 94ba374..0000000
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-public class TestJavaSystemConfig {
- private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
- private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
- private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
- private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
- private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
- private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
-
- @Test
- public void testClassName() {
- Map<String, String> map = new HashMap<String, String>();
- map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
- JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
- assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
- }
-
- @Test
- public void testGetEmptyClassNameAsNull() {
- Map<String, String> map = new HashMap<String, String>();
- map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
- map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
- JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
- assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
- assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2));
- }
-
- @Test
- public void testGetSystemNames() {
- Map<String, String> map = new HashMap<String, String>();
- map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
- map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
- JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
-
- assertEquals(2, systemConfig.getSystemNames().size());
- assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
- assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
- }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java
new file mode 100644
index 0000000..5512bbc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestSystemConfig {
+ private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
+ private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
+ private static final String MOCK_SYSTEM_FACTORY_NAME1 =
+ String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
+ private static final String MOCK_SYSTEM_FACTORY_NAME2 =
+ String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
+ private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
+ private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
+
+ private static final String SAMZA_OFFSET_DEFAULT = "samza.offset.default";
+
+ /**
+ * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
+ * any methods of this mock.
+ */
+ private static final SystemAdmin SYSTEM_ADMIN1 = mock(SystemAdmin.class);
+ /**
+ * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock
+ * any methods of this mock.
+ */
+ private static final SystemAdmin SYSTEM_ADMIN2 = mock(SystemAdmin.class);
+
+ @Test
+ public void testGetSystemFactory() {
+ Map<String, String> map = new HashMap<>();
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+ assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).get());
+ }
+
+ @Test
+ public void testGetSystemFactoryEmptyClassName() {
+ Map<String, String> map = new HashMap<>();
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
+ map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+ assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).isPresent());
+ assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2).isPresent());
+ }
+
+ @Test
+ public void testGetSystemNames() {
+ Map<String, String> map = new HashMap<>();
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+ map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+
+ assertEquals(2, systemConfig.getSystemNames().size());
+ assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
+ assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
+ }
+
+ @Test
+ public void testGetSystemAdmins() {
+ Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+ Map<String, SystemAdmin> expected = ImmutableMap.of(MOCK_SYSTEM_NAME1, SYSTEM_ADMIN1);
+ assertEquals(expected, systemConfig.getSystemAdmins());
+ }
+
+ @Test
+ public void testGetSystemAdmin() {
+ Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+ assertEquals(SYSTEM_ADMIN1, systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME1));
+ assertNull(systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME2));
+ }
+
+ @Test
+ public void testGetSystemFactories() {
+ Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName());
+ SystemConfig systemConfig = new SystemConfig(new MapConfig(map));
+ Map<String, SystemFactory> actual = systemConfig.getSystemFactories();
+ assertEquals(actual.size(), 1);
+ assertTrue(actual.get(MOCK_SYSTEM_NAME1) instanceof MockSystemFactory);
+ }
+
+ @Test
+ public void testGetDefaultStreamProperties() {
+ String defaultStreamPrefix = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+ String system1ConfigKey = "config1-key";
+ String system1ConfigValue = "config1-value";
+ String system2ConfigKey = "config2-key";
+ String system2ConfigValue = "config2-value";
+
+ Config config = new MapConfig(ImmutableMap.of(defaultStreamPrefix + system1ConfigKey, system1ConfigValue,
+ defaultStreamPrefix + system2ConfigKey, system2ConfigValue));
+ Config expected =
+ new MapConfig(ImmutableMap.of(system1ConfigKey, system1ConfigValue, system2ConfigKey, system2ConfigValue));
+ SystemConfig systemConfig = new SystemConfig(config);
+ assertEquals(expected, systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME1));
+ assertEquals(new MapConfig(), systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME2));
+ }
+
+ @Test
+ public void testGetSystemOffsetDefault() {
+ String system1OffsetDefault = "offset-system-1";
+ String system2OffsetDefault = "offset-system-2";
+ Config config = new MapConfig(ImmutableMap.of(
+ // default.stream.samza.offset.default set
+ String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT,
+ system1OffsetDefault,
+ // should not use this value since default.stream.samza.offset.default is set
+ String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT, "wrong-value",
+ // only samza.offset.default set
+ String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME2) + SAMZA_OFFSET_DEFAULT, system2OffsetDefault));
+ SystemConfig systemConfig = new SystemConfig(config);
+ assertEquals(system1OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME1));
+ assertEquals(system2OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME2));
+ assertEquals(SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING, systemConfig.getSystemOffsetDefault("other-system"));
+ }
+
+ @Test
+ public void testGetSystemKeySerde() {
+ String system1KeySerde = "system1-key-serde";
+ String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+
+ // value specified explicitly
+ Config config =
+ new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), system1KeySerde));
+ SystemConfig systemConfig = new SystemConfig(config);
+ assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is unspecified, try fall back config key
+ config = new MapConfig(
+ ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(),
+ system1KeySerde));
+ systemConfig = new SystemConfig(config);
+ assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is empty string, try fall back config key
+ config = new MapConfig(ImmutableMap.of(
+ // default stream property is empty
+ defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), "",
+ // fall back entry
+ String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(), system1KeySerde));
+ systemConfig = new SystemConfig(config);
+ assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is unspecified, fall back is also empty
+ config = new MapConfig(
+ ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(),
+ ""));
+ systemConfig = new SystemConfig(config);
+ assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
+
+ // default stream property is unspecified, fall back is also unspecified
+ config = new MapConfig();
+ systemConfig = new SystemConfig(config);
+ assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent());
+ }
+
+ @Test
+ public void testGetSystemMsgSerde() {
+ String system1MsgSerde = "system1-msg-serde";
+ String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1);
+
+ // value specified explicitly
+ Config config =
+ new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), system1MsgSerde));
+ SystemConfig systemConfig = new SystemConfig(config);
+ assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is unspecified, try fall back config msg
+ config = new MapConfig(
+ ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(),
+ system1MsgSerde));
+ systemConfig = new SystemConfig(config);
+ assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is empty string, try fall back config msg
+ config = new MapConfig(ImmutableMap.of(
+ // default stream property is empty
+ defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), "",
+ // fall back entry
+ String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(), system1MsgSerde));
+ systemConfig = new SystemConfig(config);
+ assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get());
+
+ // default stream property is unspecified, fall back is also empty
+ config = new MapConfig(
+ ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(),
+ ""));
+ systemConfig = new SystemConfig(config);
+ assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
+
+ // default stream property is unspecified, fall back is also unspecified
+ config = new MapConfig();
+ systemConfig = new SystemConfig(config);
+ assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent());
+ }
+
+ @Test
+ public void testDeleteCommittedMessages() {
+ Config config = new MapConfig(ImmutableMap.of(
+ // value is "true"
+ String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME1), "true",
+ // value is explicitly "false"
+ String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME2), "false"));
+ SystemConfig systemConfig = new SystemConfig(config);
+ assertTrue(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME1));
+ assertFalse(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME2));
+ assertFalse(systemConfig.deleteCommittedMessages("other-system")); // value is not specified
+ }
+
+ public static class MockSystemFactory implements SystemFactory {
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ throw new UnsupportedOperationException("Unnecessary for test");
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ throw new UnsupportedOperationException("Unnecessary for test");
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ switch (systemName) {
+ case MOCK_SYSTEM_NAME1:
+ return SYSTEM_ADMIN1;
+ case MOCK_SYSTEM_NAME2:
+ return SYSTEM_ADMIN2;
+ default:
+ throw new UnsupportedOperationException("System name unsupported: " + systemName);
+ }
+ }
+ }
+
+ private static String buildDefaultStreamPropertiesPrefix(String systemName) {
+ return String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName);
+ }
+}
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 244fd8f..93c7096 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -84,8 +84,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
@@ -151,8 +151,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
val generatedConfigs: MapConfig = JobPlanner.generateSingleJobConfig(userDefinedConfig)
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
deleted file mode 100644
index cc54d00..0000000
--- a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import scala.collection.JavaConverters._
-import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY}
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSystemConfig {
- val MOCK_SYSTEM_NAME1 = "mocksystem1"
- val MOCK_SYSTEM_NAME2 = "mocksystem2"
- val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1)
- val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2)
- val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"
- val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"
-
- def testClassName {
- val configMap = Map[String, String](
- MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1
- )
- val config = new MapConfig(configMap.asJava)
-
- assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse(""))
- }
-
- @Test
- def testGetEmptyClassNameAsNull {
- val configMap = Map[String, String](
- MOCK_SYSTEM_FACTORY_NAME1 -> "",
- MOCK_SYSTEM_FACTORY_NAME1 -> " "
- )
- val config = new MapConfig(configMap.asJava)
-
- assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None)
- assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None)
- }
-
- def testGetSystemNames {
- val configMap = Map[String, String](
- MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1,
- MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2
- )
- val config = new MapConfig(configMap.asJava)
- val systemNames = config.getSystemNames()
-
- assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1))
- assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2))
- }
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index b7a9bec..5b43840 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -23,7 +23,7 @@ import java.util
import org.apache.samza.Partition
import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
-import org.apache.samza.config.{JobConfig, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.config._
import org.apache.samza.container.{SamzaContainer, TaskName}
import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
import org.apache.samza.job.MockJobFactory
@@ -91,8 +91,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
JobConfig.JOB_CONTAINER_COUNT -> "2",
TaskConfig.INPUT_STREAMS -> "test.stream1",
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory",
JobConfig.MONITOR_PARTITION_CHANGE -> "true",
JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> "100"
@@ -153,8 +153,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
JobConfig.JOB_CONTAINER_COUNT -> "2",
TaskConfig.INPUT_STREAMS -> "test.stream1",
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
)
@@ -230,7 +230,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
*/
@Test
def testWithPartitionAssignmentWithMockJobFactory {
- val config = new SystemConfig(getTestConfig(classOf[MockJobFactory]))
+ val config = getTestConfig(classOf[MockJobFactory])
val systemStream = new SystemStream("test", "stream1")
val streamMetadataCache = mock(classOf[StreamMetadataCache])
when(streamMetadataCache.getStreamMetadata(Set(systemStream), true)).thenReturn(
@@ -256,8 +256,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
JobConfig.STREAM_JOB_FACTORY_CLASS -> clazz.getCanonicalName,
JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]",
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava)
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava)
config
}
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
index 3d385d6..e9e372d 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
@@ -52,7 +52,7 @@ class TestRangeSystemStreamPartitionMatcher {
JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
JobConfig.SSP_MATCHER_CONFIG_RANGES -> range,
- (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+ (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
}
@Test
@@ -99,7 +99,7 @@ class TestRangeSystemStreamPartitionMatcher {
TaskConfig.INPUT_STREAMS -> "test.stream1",
JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
- (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+ (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
}
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
index 255c85e..cb3b1e0 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
@@ -52,7 +52,7 @@ class TestRegexSystemStreamPartitionMatcher {
JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex,
- (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+ (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
}
@Test
@@ -70,7 +70,7 @@ class TestRegexSystemStreamPartitionMatcher {
TaskConfig.INPUT_STREAMS -> "test.stream1",
JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX,
- (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
+ (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index dea60b3..0b59871 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -82,7 +82,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// check if samza default offset value is defined
- String systemOffsetDefault = new JavaSystemConfig(config).getSystemOffsetDefault(systemName);
+ String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName);
// Translate samza config value to kafka config value
String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
@@ -214,10 +214,10 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
String newAutoOffsetReset = KAFKA_OFFSET_LATEST;
if (!StringUtils.isBlank(samzaOffsetDefault)) {
switch (samzaOffsetDefault) {
- case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
+ case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
newAutoOffsetReset = KAFKA_OFFSET_LATEST;
break;
- case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
+ case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
break;
default:
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index f4db408..0ffd1a7 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -48,9 +48,9 @@ import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SystemConfig;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 2999800..efbe780 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -26,6 +26,7 @@ import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.{StreamSpec, SystemFactory}
import org.apache.samza.system.kafka.KafkaStreamSpec
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{KafkaUtil, Logging, Util, _}
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -38,9 +39,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse(
throw new SamzaException("No system defined for Kafka's checkpoint manager."))
- val checkpointSystemFactoryName = new SystemConfig(config)
- .getSystemFactory(checkpointSystemName)
- .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
+ val systemConfig = new SystemConfig(config)
+ val checkpointSystemFactoryName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(checkpointSystemName))
+ .toOption
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format checkpointSystemName))
val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory])
val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 607feb0..10f879c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -23,15 +23,13 @@ package org.apache.samza.config
import java.util
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
-import java.util.{Properties, UUID}
+import java.util.Properties
import com.google.common.collect.ImmutableMap
-import kafka.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.util.{Logging, StreamUtil}
import scala.collection.JavaConverters._
@@ -68,7 +66,7 @@ object KafkaConfig {
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
*/
- val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold"
val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
@@ -79,7 +77,7 @@ object KafkaConfig {
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
*/
- val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes"
val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
@@ -113,7 +111,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
}
private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
- val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+ val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
defaultReplicationFactor
}
@@ -127,7 +125,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
*/
def getCheckpointSegmentBytes() = {
- val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+ val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
}
@@ -144,7 +142,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
case Some(rplFactor) => rplFactor
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
- val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+ val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
systemReplicationFactor
}
@@ -161,7 +159,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
case Some(segBytes) => segBytes
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
- val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+ val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
segBytes
}
@@ -240,7 +238,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
storageConfig.getChangelogStream(storeName).foreach(changelogName => {
val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
- val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
storeToChangelog += storeName -> systemStream.getStream
})
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
index 02a6275..9b5cb31 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config._
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.util.{Logging, StreamUtil}
import scala.collection.JavaConverters._
@@ -72,7 +71,7 @@ object KafkaConfig {
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
*/
- val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold"
val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
@@ -83,7 +82,7 @@ object KafkaConfig {
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
*/
- val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes"
val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
@@ -117,7 +116,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
}
private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
- val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+ val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
defaultReplicationFactor
}
@@ -131,7 +130,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
*/
def getCheckpointSegmentBytes() = {
- val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+ val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
}
@@ -148,7 +147,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
case Some(rplFactor) => rplFactor
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
- val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+ val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
systemReplicationFactor
}
@@ -165,7 +164,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
case Some(segBytes) => segBytes
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
- val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+ val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
segBytes
}
@@ -244,7 +243,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
storageConfig.getChangelogStream(storeName).foreach(changelogName => {
val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
- val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
storeToChangelog += storeName -> systemStream.getStream
})
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index 7e8509d..b4003d8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -33,7 +33,6 @@ import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemAdmin
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.system.SystemConsumer
object KafkaSystemFactory extends Logging {
@@ -129,7 +128,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
(topicName, changelogInfo)
}}
- val deleteCommittedMessages = config.deleteCommittedMessages(systemName)
+ val systemConfig = new SystemConfig(config)
+ val deleteCommittedMessages = systemConfig.deleteCommittedMessages(systemName)
val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
new KafkaSystemAdmin(
systemName,
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 0a0aae8..dbe82fc 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -26,9 +26,7 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server.ConfigType
import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
import com.google.common.collect.ImmutableMap
-import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config._
import org.apache.samza.container.TaskName
@@ -37,6 +35,7 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system._
import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
@@ -209,9 +208,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
throw new SamzaException("No system defined for Kafka's checkpoint manager."))
- val systemFactoryClassName = new SystemConfig(config)
- .getSystemFactory(systemName)
- .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+ val systemConfig = new SystemConfig(config)
+ val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName))
val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 5824489..12c03f6 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream;
* This class contains the methods for getting properties that are needed by the
* StreamAppender.
*/
-public class Log4jSystemConfig extends JavaSystemConfig {
+public class Log4jSystemConfig extends SystemConfig {
private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index b4a97f7..19e6ea6 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -286,7 +286,6 @@ public class StreamAppender extends AppenderSkeleton {
protected void setupSystem() {
config = getConfig();
- SystemFactory systemFactory = null;
Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
if (streamName == null) {
@@ -298,12 +297,10 @@ public class StreamAppender extends AppenderSkeleton {
metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
String systemName = log4jSystemConfig.getSystemName();
- String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
- if (systemFactoryName != null) {
- systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
- } else {
- throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
- }
+ String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
+ .orElseThrow(() -> new SamzaException(
+ "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
+ SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
setSerde(log4jSystemConfig, systemName, streamName);
diff --git a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 5824489..12c03f6 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream;
* This class contains the methods for getting properties that are needed by the
* StreamAppender.
*/
-public class Log4jSystemConfig extends JavaSystemConfig {
+public class Log4jSystemConfig extends SystemConfig {
private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 28f759e..a224762 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -307,7 +307,6 @@ public class StreamAppender extends AbstractAppender {
protected void setupSystem() {
config = getConfig();
- SystemFactory systemFactory = null;
Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
if (streamName == null) {
@@ -319,12 +318,10 @@ public class StreamAppender extends AbstractAppender {
metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
String systemName = log4jSystemConfig.getSystemName();
- String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
- if (systemFactoryName != null) {
- systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
- } else {
- throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
- }
+ String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
+ .orElseThrow(() -> new SamzaException(
+ "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
+ SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
setSerde(log4jSystemConfig, systemName, streamName);
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
index a74d0ba..da61f74 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
@@ -28,7 +28,7 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
-import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.SystemConfig;
/**
* Descriptor for an InMemorySystem.
@@ -87,7 +87,7 @@ public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDes
public Map<String, String> toConfig() {
HashMap<String, String> configs = new HashMap<>(super.toConfig());
configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
- configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
+ configs.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
return configs;
}
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index cc8cceb..683ced4 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -32,10 +32,10 @@ import java.util.stream.IntStream;
import org.apache.commons.cli.ParseException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;