You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/01 01:06:38 UTC
samza git commit: SAMZA-570: Enabling auto-discovery of regex input
topics
Repository: samza
Updated Branches:
refs/heads/master bee30f5df -> 4dbc2c6de
SAMZA-570: Enabling auto-discovery of regex input topics
This PR makes the following changes
* Enriches StreamPartitionCountMonitor to periodically monitor input-regexes to match to actual inputs and stop the job when a new input stream is discovered.
* Add a new API to SysAdmin to allow listing of all streams, e.g., Kafka-topics. KafkaSysAdmin implementation of this uses KafkaConsumer's listTopics API. (Even if listTopics had 1 million topics with 100 bytes per topic total, temporary memory overhead will be 100 MB).
* Added config job.coordinator.monitor-input-regex.frequency.ms for the monitoring frequency, and job.coordinator.monitor-input-regex.%s for each input system. Users can then choose desired regex for each input system, e.g., job.coordinator.monitor-input-regex.kafka=test-.*.
* We can later enrich RegexTopicGen rewriter to add a monitor-input-regex config to allow periodic jonitoring
* Tested: Unit test for SPCM and tested with test jobs on local grid.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #796 from rmatharu/newtopic-test
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4dbc2c6d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4dbc2c6d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4dbc2c6d
Branch: refs/heads/master
Commit: 4dbc2c6de5e9048654eed0195364fdc09cefa3bf
Parents: bee30f5
Author: Ray Matharu <rm...@linkedin.com>
Authored: Fri Nov 30 17:06:30 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Nov 30 17:06:30 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/system/SystemAdmin.java | 8 +
.../apache/samza/PartitionChangeException.java | 31 ---
.../ClusterBasedJobCoordinator.java | 93 ++++++--
.../InputStreamsDiscoveredException.java | 32 +++
.../coordinator/PartitionChangeException.java | 33 +++
.../samza/coordinator/StreamRegexMonitor.java | 223 +++++++++++++++++++
.../org/apache/samza/config/JobConfig.scala | 49 +++-
.../samza/system/StreamMetadataCache.scala | 14 +-
.../coordinator/TestInputRegexMonitor.java | 136 +++++++++++
.../TestStreamPartitionCountMonitor.scala | 6 +-
.../samza/system/kafka/KafkaSystemAdmin.java | 8 +
.../org/apache/samza/config/KafkaConfig.scala | 9 +-
.../samza/config/RegExTopicGenerator.scala | 3 +-
.../samza/config/TestRegExTopicGenerator.scala | 2 +-
14 files changed, 585 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 6ee7df2..8201b3d 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -156,4 +156,12 @@ public interface SystemAdmin {
return getSystemStreamMetadata(streamNames);
}
+ /**
+ * Fetch the set of all available streams
+ * @return The set of all available SystemStreams.
+ */
+ default Set<SystemStream> getAllSystemStreams() {
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
deleted file mode 100644
index 4619dfa..0000000
--- a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-
-/**
- * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
- */
-public class PartitionChangeException extends SamzaException {
-
- public PartitionChangeException(String s) {
- super(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index ff70df0..4c5a34b 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,11 +19,14 @@
package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
import org.apache.samza.SamzaException;
-import org.apache.samza.PartitionChangeException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
@@ -33,8 +36,11 @@ import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.InputStreamsDiscoveredException;
import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.PartitionChangeException;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamRegexMonitor;
import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
@@ -49,9 +55,9 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implements a JobCoordinator that is completely independent of the underlying cluster
@@ -134,7 +140,12 @@ public class ClusterBasedJobCoordinator {
/**
* The input topic partition count monitor
*/
- private final StreamPartitionCountMonitor partitionMonitor;
+ private final Optional<StreamPartitionCountMonitor> partitionMonitor;
+
+ /**
+ * The input stream regex monitor
+ */
+ private final Optional<StreamRegexMonitor> inputStreamRegexMonitor;
/**
* Metrics to track stats around container failures, needed containers etc.
@@ -174,7 +185,8 @@ public class ClusterBasedJobCoordinator {
// build a JobModelManager and ChangelogStreamManager and perform partition assignments.
changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
- jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
+ jobModelManager =
+ JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
config = jobModelManager.jobModel().getConfig();
hasDurableStores = new StorageConfig(config).hasDurableStores();
@@ -182,6 +194,7 @@ public class ClusterBasedJobCoordinator {
// The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
systemAdmins = new SystemAdmins(config);
partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
+ inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins);
clusterManagerConfig = new ClusterManagerConfig(config);
isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
@@ -226,7 +239,7 @@ public class ClusterBasedJobCoordinator {
Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
Map<String, ContainerModel> containers = jobModel.getContainers();
- for (ContainerModel containerModel: containers.values()) {
+ for (ContainerModel containerModel : containers.values()) {
for (TaskModel taskModel : containerModel.getTasks().values()) {
taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
}
@@ -236,7 +249,8 @@ public class ClusterBasedJobCoordinator {
containerProcessManager.start();
systemAdmins.start();
- partitionMonitor.start();
+ partitionMonitor.ifPresent(monitor -> monitor.start());
+ inputStreamRegexMonitor.ifPresent(monitor -> monitor.start());
boolean isInterrupted = false;
@@ -270,7 +284,8 @@ public class ClusterBasedJobCoordinator {
private void onShutDown() {
try {
- partitionMonitor.stop();
+ partitionMonitor.ifPresent(monitor -> monitor.stop());
+ inputStreamRegexMonitor.ifPresent(monitor -> monitor.stop());
systemAdmins.stop();
containerProcessManager.stop();
coordinatorStreamManager.stop();
@@ -289,26 +304,63 @@ public class ClusterBasedJobCoordinator {
}
}
- private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
+ private Optional<StreamPartitionCountMonitor> getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
if (inputStreamsToMonitor.isEmpty()) {
throw new SamzaException("Input streams to a job can not be empty.");
}
- return new StreamPartitionCountMonitor(
- inputStreamsToMonitor,
- streamMetadata,
- metrics,
- new JobConfig(config).getMonitorPartitionChangeFrequency(),
- streamsChanged -> {
- // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
+ return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor, streamMetadata, metrics,
+ new JobConfig(config).getMonitorPartitionChangeFrequency(), streamsChanged -> {
+ // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
if (hasDurableStores) {
log.error("Input topic partition count changed in a job with durable state. Failing the job.");
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
- });
+ }));
+ }
+
+ private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, SystemAdmins systemAdmins) {
+
+ // if input regex monitor is not enabled return empty
+ if (new JobConfig(config).getMonitorRegexEnabled()) {
+ log.info("StreamRegexMonitor is disabled.");
+ return Optional.empty();
+ }
+
+ StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+ Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+ if (inputStreamsToMonitor.isEmpty()) {
+ throw new SamzaException("Input streams to a job can not be empty.");
+ }
+
+ // First list all rewriters
+ Option<String> rewritersList = new JobConfig(config).getConfigRewriters();
+
+ // if no rewriter is defined, there is nothing to monitor
+ if (!rewritersList.isDefined()) {
+ log.warn("No config rewriters are defined. No StreamRegexMonitor created.");
+ return Optional.empty();
+ }
+
+ // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+ Map<String, Pattern> inputRegexesToMonitor =
+ JavaConverters.mapAsJavaMapConverter(new JobConfig(config).getMonitorRegexPatternMap(rewritersList.get()))
+ .asJava();
+
+ return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor, streamMetadata, metrics,
+ new JobConfig(config).getMonitorRegexFrequency(), new StreamRegexMonitor.Callback() {
+ @Override
+ public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+ Map<String, Pattern> regexesMonitored) {
+ log.error("New input system-streams discovered. Failing the job. New input streams: {}", newInputStreams,
+ " Existing input streams:", inputStreamsToMonitor);
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ coordinatorException = new InputStreamsDiscoveredException("New input streams added: " + newInputStreams);
+ }
+ }));
}
// The following two methods are package-private and for testing only
@@ -321,10 +373,9 @@ public class ClusterBasedJobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor getPartitionMonitor() {
- return partitionMonitor;
+ return partitionMonitor.get();
}
-
/**
* The entry point for the {@link ClusterBasedJobCoordinator}
* @param args args
@@ -335,12 +386,14 @@ public class ClusterBasedJobCoordinator {
try {
//Read and parse the coordinator system config.
log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
- coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
+ coordinatorSystemConfig =
+ new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
} catch (IOException e) {
log.error("Exception while reading coordinator stream config {}", e);
throw new SamzaException(e);
}
ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
jc.run();
+ log.info("Finished ClusterBasedJobCoordinator run");
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
new file mode 100644
index 0000000..6e85f43
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Exception to indicate that the new input streams have been added.
+ */
+public class InputStreamsDiscoveredException extends SamzaException {
+
+ public InputStreamsDiscoveredException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
new file mode 100644
index 0000000..4594307
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
+ */
+public class PartitionChangeException extends SamzaException {
+
+ public PartitionChangeException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
new file mode 100644
index 0000000..3c86bfb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * A single-thread based monitor that periodically monitors the given set of stream regexes, and matches them to
+ * the given set of streams. If a stream matching a given regex that is not in the corresponding stream set is detected,
+ * it invokes a {@link StreamRegexMonitor.Callback} with the initial input set, the new input stream set, and the regexes
+ * being monitored.
+ */
+public class StreamRegexMonitor {
+ private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class);
+
+ // Factory of daemon-threads to create the single threaded executor pool
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName())
+ .build();
+
+ // Enum to describe the state of the regexMonitor
+ private enum State {
+ INIT, RUNNING, STOPPED
+ }
+
+ private final Set<SystemStream> streamsToMonitor;
+ private final Map<String, Pattern> systemRegexesToMonitor;
+ private final StreamMetadataCache metadataCache;
+ private final int inputRegexMonitorPeriodMs;
+
+ // Map of gauges (one per system), emitted when new input stream for that system is detected
+ private final Map<String, Gauge<Integer>> gauges;
+
+ private final Callback callbackMethod;
+
+ // Used to guard write access to state.
+ private final Object lock = new Object();
+
+ private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+ private volatile State state = State.INIT;
+
+ /**
+ * A callback that is invoked when the {@link StreamRegexMonitor} detects a new input stream matching given regex.
+ */
+ public interface Callback {
+ /**
+ * Method to be called when new input streams are detected.
+ * @param initialInputSet The initial set of input streams
+ * @param newInputStreams The set of new input streams discovered
+ * @param regexesMonitored The set of regexes being monitored
+ */
+ void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+ Map<String, Pattern> regexesMonitored);
+ }
+
+ /**
+ * Default constructor.
+ *
+ * @param streamsToMonitor a set of SystemStreams to monitor
+ * @param systemRegexesToMonitor map of regexes for each input system
+ * @param metadataCache the metadata cache which will be used to fetch metadata for partition counts.
+ * @param metrics the metrics registry to which the metrics should be added.
+ * @param inputRegexMonitorPeriodMs the period at which the monitor will check each input-regex
+ * @param monitorCallback the callback method to be invoked when new input stream matching regex is detected
+ */
+ public StreamRegexMonitor(Set<SystemStream> streamsToMonitor, Map<String, Pattern> systemRegexesToMonitor,
+ StreamMetadataCache metadataCache, MetricsRegistry metrics, int inputRegexMonitorPeriodMs,
+ Callback monitorCallback) {
+ this.streamsToMonitor = streamsToMonitor;
+ this.systemRegexesToMonitor = systemRegexesToMonitor;
+ this.metadataCache = metadataCache;
+ this.callbackMethod = monitorCallback;
+ this.inputRegexMonitorPeriodMs = inputRegexMonitorPeriodMs;
+
+ // Pre-populate the gauges
+ Map<String, Gauge<Integer>> mutableGauges = new HashMap<>();
+ for (String systemToMonitor : systemRegexesToMonitor.keySet()) {
+ Gauge gauge = metrics.newGauge("job-coordinator", String.format("%s-new-input-streams", systemToMonitor), 0);
+ mutableGauges.put(systemToMonitor, gauge);
+ }
+ gauges = Collections.unmodifiableMap(mutableGauges);
+
+ log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", this.getClass().getName(),
+ this.inputRegexMonitorPeriodMs, this.systemRegexesToMonitor);
+ }
+
+ /**
+ * Starts the monitor.
+ */
+ public void start() {
+ synchronized (lock) {
+ switch (state) {
+ case INIT:
+ if (inputRegexMonitorPeriodMs > 0) {
+ schedulerService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ monitorInputRegexes();
+ }
+ }, 0, inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS);
+ }
+ state = State.RUNNING;
+ break;
+
+ case RUNNING:
+ // start is idempotent
+ return;
+
+ case STOPPED:
+ throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted.");
+ }
+ }
+ }
+
+ /**
+ * Stops the monitor. Once it stops, it cannot be restarted.
+ */
+ public void stop() {
+ synchronized (lock) {
+ // We could also wait for full termination of the scheduler service, but it is overkill for
+ // our use case.
+ schedulerService.shutdownNow();
+
+ state = State.STOPPED;
+ }
+ }
+
+ private void monitorInputRegexes() {
+ log.debug("Running monitorInputRegexes");
+
+ try {
+ // obtain the list of SysStreams that match given patterns for all systems
+ Set<SystemStream> inputStreamsMatchingPattern = new HashSet<>();
+
+ // For each input system, for which we have a regex to monitor
+ for (String systemName : this.systemRegexesToMonitor.keySet()) {
+
+ try {
+ // obtain the list of SysStreams that match the regex for this system
+ // using the systemAdmin in the metadataCache
+ inputStreamsMatchingPattern.addAll(
+ JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(systemName))
+ .asJava()
+ .stream()
+ .filter(x -> x.getStream().matches(this.systemRegexesToMonitor.get(systemName).pattern()))
+ .collect(Collectors.toSet()));
+ } catch (UnsupportedOperationException e) {
+ log.error("UnsupportedOperationException while monitoring input regexes for system {}", systemName, e);
+ }
+ }
+
+ // if there is a stream that is in the input-Set but not in the streamsToMonitor
+ // since streamsToMonitor = task.inputs
+ if (!streamsToMonitor.containsAll(inputStreamsMatchingPattern)) {
+ log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ",
+ inputStreamsMatchingPattern, streamsToMonitor);
+
+ // invoke notify callback with new input streams
+ this.callbackMethod.onInputStreamsChanged(streamsToMonitor,
+ Sets.difference(inputStreamsMatchingPattern, streamsToMonitor), systemRegexesToMonitor);
+ } else {
+ log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}",
+ streamsToMonitor, inputStreamsMatchingPattern);
+ }
+ } catch (Exception e) {
+ log.error("Exception while monitoring input regexes.", e);
+ }
+ }
+
+ @VisibleForTesting
+ boolean isRunning() {
+ return state == State.RUNNING;
+ }
+
+ /**
+ * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+ * and false otherwise.
+ * <p>
+ * This is currently exposed at the package private level for tests only.
+ */
+ @VisibleForTesting
+ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return schedulerService.awaitTermination(timeout, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 2bc6420..5363e72 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -21,12 +21,16 @@ package org.apache.samza.config
import java.io.File
+import java.util.regex.Pattern
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
import org.apache.samza.runtime.DefaultLocationIdProviderFactory
import org.apache.samza.util.Logging
+import scala.collection.mutable
+
+
object JobConfig {
// job config constants
val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class
@@ -77,6 +81,15 @@ object JobConfig {
val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms"
+
+ val MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms"
+ val DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000
+
+ val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+ val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+ val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+
val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
@@ -127,7 +140,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getCoordinatorSystemName = {
val system = getCoordinatorSystemNameOrNull
if (system == null) {
- throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")
+ throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution." + config)
}
system
}
@@ -158,10 +171,44 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
}
+ // StreamRegexMonitor is disabled if the MonitorRegexFRequency is <= 0
+ def getMonitorRegexEnabled = (getMonitorRegexFrequency <= 0)
+
def getMonitorPartitionChangeFrequency = getInt(
JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
+ def getMonitorRegexFrequency = getInt(
+ JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS,
+ JobConfig.DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS)
+
+ def getMonitorRegexPatternMap(rewritersList : String) : mutable.HashMap[String, Pattern] = {
+ // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+ val inputRegexesToMonitor: mutable.HashMap[String, Pattern] = mutable.HashMap[String, Pattern]()
+ val rewriters: Array[String] = rewritersList.split(",")
+ // iterate over each rewriter and obtain the system and regex for it
+ for (rewriterName <- rewriters) {
+ val rewriterSystem: Option[String] = new JobConfig(config).getRegexResolvedSystem(rewriterName)
+ val rewriterRegex: Option[String] = new JobConfig(config).getRegexResolvedStreams(rewriterName)
+ if (rewriterSystem.isDefined && rewriterRegex.isDefined) {
+ var patternForSystem: Option[Pattern] = inputRegexesToMonitor.get(rewriterSystem.get)
+ patternForSystem =
+ if (patternForSystem == None) Some(Pattern.compile(rewriterRegex.get))
+ else
+ Some(Pattern.compile(String.join("|", patternForSystem.get.pattern(), rewriterRegex.get)))
+ inputRegexesToMonitor.put(rewriterSystem.get, patternForSystem.get)
+ }
+ }
+ inputRegexesToMonitor
+ }
+
+ // regex-related config methods duplicated from KafkaConfig to avoid module dependency
+ def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
+ def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
+
+
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index edffac7..abd6942 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -19,9 +19,11 @@
package org.apache.samza.system
-import org.apache.samza.util.{Logging, Clock, SystemClock}
+import org.apache.samza.util.{Clock, Logging, SystemClock}
import org.apache.samza.SamzaException
+
import scala.collection.JavaConverters._
+import scala.collection.mutable
/**
* Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
@@ -84,6 +86,16 @@ class StreamMetadataCache (
}
/**
+ * Returns the list of System Streams for this system.
+ * @param systemName
+ * @param pattern
+ */
+ def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
+ val systemAdmin = systemAdmins.getSystemAdmin(systemName)
+ systemAdmin.getAllSystemStreams().asScala
+ }
+
+ /**
* Returns metadata about the given streams. If the metadata isn't in the cache, it is retrieved from the systems
* using the given SystemAdmins.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
new file mode 100644
index 0000000..84b026b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.collection.JavaConversions$;
+
+
+public class TestInputRegexMonitor {
+
+ private StreamRegexMonitor streamRegexMonitor;
+ private CountDownLatch callbackCount;
+
+ private int inputRegexMs = 10;
+ private String systemName = "kafka";
+ private int expectedNumberOfCallbacks = 1;
+ private Set<SystemStream> inputStreamsDiscovered;
+ private final SystemStream sampleStream = new SystemStream(systemName, "test-1");
+
+ @Before
+ public void setUp() {
+
+ inputStreamsDiscovered = new HashSet<>();
+ Map<String, Pattern> patternMap = new HashMap<>();
+ patternMap.put(systemName, Pattern.compile("test-.*"));
+
+ StreamMetadataCache mockStreamMetadataCache = new MockStreamMetadataCache(null, 1, null);
+
+ MetricsRegistry metrics = Mockito.mock(MetricsRegistry.class);
+ this.callbackCount = new CountDownLatch(expectedNumberOfCallbacks);
+
+ // Creating an streamRegexMonitor with empty-input set and test-.* regex input
+ this.streamRegexMonitor =
+ new StreamRegexMonitor(new HashSet<>(), patternMap, mockStreamMetadataCache, metrics, inputRegexMs,
+ new StreamRegexMonitor.Callback() {
+ @Override
+ public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+ Map<String, Pattern> regexesMonitored) {
+ callbackCount.countDown();
+ inputStreamsDiscovered.addAll(newInputStreams);
+
+ // Check that the newInputStream discovered is "kafka" "Test-1"
+ Assert.assertTrue(inputStreamsDiscovered.size() == 1);
+ Assert.assertTrue(inputStreamsDiscovered.contains(sampleStream));
+ }
+ });
+ }
+
+ @Test
+ public void testStartStop() throws InterruptedException {
+ Assert.assertFalse(streamRegexMonitor.isRunning());
+
+ // Normal start
+ streamRegexMonitor.start();
+ Assert.assertTrue(streamRegexMonitor.isRunning());
+
+ // Start ought to be idempotent
+ streamRegexMonitor.start();
+ Assert.assertTrue(streamRegexMonitor.isRunning());
+
+ // Normal stop
+ streamRegexMonitor.stop();
+ Assert.assertTrue(streamRegexMonitor.awaitTermination(1, TimeUnit.SECONDS));
+ Assert.assertFalse(streamRegexMonitor.isRunning());
+
+ try {
+ streamRegexMonitor.start();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getClass().equals(IllegalStateException.class));
+ }
+
+ // Stop ought to be idempotent
+ Assert.assertFalse(streamRegexMonitor.isRunning());
+ streamRegexMonitor.stop();
+ Assert.assertFalse(streamRegexMonitor.isRunning());
+ }
+
+ @Test
+ public void testSchedulingAndInputAddition() throws Exception {
+ this.streamRegexMonitor.start();
+ try {
+ if (!callbackCount.await(1, TimeUnit.SECONDS)) {
+ throw new Exception(
+ "Did not see " + expectedNumberOfCallbacks + " callbacks after waiting. " + callbackCount.toString());
+ }
+ } finally {
+ System.out.println("CallbackCount is " + callbackCount.getCount());
+ this.streamRegexMonitor.stop();
+ }
+ }
+
+ private class MockStreamMetadataCache extends StreamMetadataCache {
+
+ public MockStreamMetadataCache(SystemAdmins systemAdmins, int cacheTTLms, Clock clock) {
+ super(systemAdmins, cacheTTLms, clock);
+ }
+
+ @Override
+ public scala.collection.mutable.Set getAllSystemStreams(String systemName) {
+ Set<SystemStream> s = new HashSet<>();
+ return JavaConversions$.MODULE$.asScalaSet(new HashSet<SystemStream>(Arrays.asList(sampleStream)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index 2aafab1..0cc7a90 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -20,7 +20,8 @@
package org.apache.samza.coordinator
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.HashMap
+import java.util.regex.Pattern
+
import org.apache.samza.Partition
import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,6 +35,7 @@ import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
+import scala.collection.immutable.HashMap
class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@@ -43,6 +45,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
val mockMetadataCache = mock[StreamMetadataCache]
val inputSystemStream = new SystemStream("test-system", "test-stream")
val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+ val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
@@ -209,6 +212,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
def testScheduler(): Unit = {
val mockMetadataCache = new MockStreamMetadataCache
val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
val sampleCount = new CountDownLatch(2); // Verify 2 invocations
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 596b07a..c3c66c7 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -54,6 +54,7 @@ import org.apache.samza.config.SystemConfig;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ExponentialSleepStrategy;
@@ -679,6 +680,13 @@ public class KafkaSystemAdmin implements SystemAdmin {
return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
}
+ @Override
+ public Set<SystemStream> getAllSystemStreams() {
+ return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
+ .map(x -> new SystemStream(systemName, x))
+ .collect(Collectors.toSet());
+ }
+
/**
* Container for metadata about offsets.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1954ac7..607feb0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,9 +40,6 @@ object KafkaConfig {
val TOPIC_REPLICATION_FACTOR = "replication.factor"
val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
- val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
- val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
- val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
val SEGMENT_BYTES = "segment.bytes"
@@ -206,11 +203,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
}
// regex resolver
- def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+ def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
- def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+ def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
- def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+ def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
/**
* Gets the replication factor for the changelog topics. Uses the following precedence.
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index e6068b0..654354b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -21,7 +21,8 @@ package org.apache.samza.config
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS}
+import org.apache.samza.config.KafkaConfig.{Config2Kafka}
+import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS}
import org.apache.samza.SamzaException
import org.apache.samza.util.{Logging, StreamUtil}
http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 69d7da6..8b19292 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -24,7 +24,7 @@ import collection.JavaConverters._
import org.junit.Assert._
import org.junit.Test
-import KafkaConfig._
+import JobConfig._
class TestRegExTopicGenerator {