You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/01 01:06:38 UTC

samza git commit: SAMZA-570: Enabling auto-discovery of regex input topics

Repository: samza
Updated Branches:
  refs/heads/master bee30f5df -> 4dbc2c6de


SAMZA-570: Enabling auto-discovery of regex input topics

This PR makes the following changes

* Enriches StreamPartitionCountMonitor to periodically monitor input-regexes to match to actual inputs and stop the job when a new input stream is discovered.

* Add a new API to SysAdmin to allow listing of all streams, e.g., Kafka-topics. KafkaSysAdmin implementation of this uses KafkaConsumer's listTopics API. (Even if listTopics had 1 million topics with 100 bytes per topic total, temporary memory overhead will be 100 MB).

* Added config job.coordinator.monitor-input-regex.frequency.ms for the monitoring frequency, and job.coordinator.monitor-input-regex.%s for each input system. Users can then choose desired regex for each input system, e.g., job.coordinator.monitor-input-regex.kafka=test-.*.

* We can later enrich RegexTopicGen rewriter to add a monitor-input-regex config to allow periodic jonitoring

* Tested: Unit test for SPCM and tested with test jobs on local grid.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #796 from rmatharu/newtopic-test


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4dbc2c6d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4dbc2c6d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4dbc2c6d

Branch: refs/heads/master
Commit: 4dbc2c6de5e9048654eed0195364fdc09cefa3bf
Parents: bee30f5
Author: Ray Matharu <rm...@linkedin.com>
Authored: Fri Nov 30 17:06:30 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Nov 30 17:06:30 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemAdmin.java    |   8 +
 .../apache/samza/PartitionChangeException.java  |  31 ---
 .../ClusterBasedJobCoordinator.java             |  93 ++++++--
 .../InputStreamsDiscoveredException.java        |  32 +++
 .../coordinator/PartitionChangeException.java   |  33 +++
 .../samza/coordinator/StreamRegexMonitor.java   | 223 +++++++++++++++++++
 .../org/apache/samza/config/JobConfig.scala     |  49 +++-
 .../samza/system/StreamMetadataCache.scala      |  14 +-
 .../coordinator/TestInputRegexMonitor.java      | 136 +++++++++++
 .../TestStreamPartitionCountMonitor.scala       |   6 +-
 .../samza/system/kafka/KafkaSystemAdmin.java    |   8 +
 .../org/apache/samza/config/KafkaConfig.scala   |   9 +-
 .../samza/config/RegExTopicGenerator.scala      |   3 +-
 .../samza/config/TestRegExTopicGenerator.scala  |   2 +-
 14 files changed, 585 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 6ee7df2..8201b3d 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -156,4 +156,12 @@ public interface SystemAdmin {
     return getSystemStreamMetadata(streamNames);
   }
 
+  /**
+   * Fetch the set of all available streams
+   * @return The set of all available SystemStreams.
+   */
+  default Set<SystemStream> getAllSystemStreams() {
+    throw new UnsupportedOperationException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
deleted file mode 100644
index 4619dfa..0000000
--- a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-
-/**
-  * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
-  */
-public class PartitionChangeException extends SamzaException {
-
-  public PartitionChangeException(String s) {
-    super(s);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index ff70df0..4c5a34b 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,11 +19,14 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 import org.apache.samza.SamzaException;
-import org.apache.samza.PartitionChangeException;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -33,8 +36,11 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.PartitionChangeException;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamRegexMonitor;
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -49,9 +55,9 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Implements a JobCoordinator that is completely independent of the underlying cluster
@@ -134,7 +140,12 @@ public class ClusterBasedJobCoordinator {
   /**
    * The input topic partition count monitor
    */
-  private final StreamPartitionCountMonitor partitionMonitor;
+  private final Optional<StreamPartitionCountMonitor> partitionMonitor;
+
+  /**
+   * The input stream regex monitor
+   */
+  private final Optional<StreamRegexMonitor> inputStreamRegexMonitor;
 
   /**
    * Metrics to track stats around container failures, needed containers etc.
@@ -174,7 +185,8 @@ public class ClusterBasedJobCoordinator {
 
     // build a JobModelManager and ChangelogStreamManager and perform partition assignments.
     changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
+    jobModelManager =
+        JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
 
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
@@ -182,6 +194,7 @@ public class ClusterBasedJobCoordinator {
     // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
     systemAdmins = new SystemAdmins(config);
     partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
+    inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins);
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
 
@@ -226,7 +239,7 @@ public class ClusterBasedJobCoordinator {
 
       Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
       Map<String, ContainerModel> containers = jobModel.getContainers();
-      for (ContainerModel containerModel: containers.values()) {
+      for (ContainerModel containerModel : containers.values()) {
         for (TaskModel taskModel : containerModel.getTasks().values()) {
           taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
         }
@@ -236,7 +249,8 @@ public class ClusterBasedJobCoordinator {
 
       containerProcessManager.start();
       systemAdmins.start();
-      partitionMonitor.start();
+      partitionMonitor.ifPresent(monitor -> monitor.start());
+      inputStreamRegexMonitor.ifPresent(monitor -> monitor.start());
 
       boolean isInterrupted = false;
 
@@ -270,7 +284,8 @@ public class ClusterBasedJobCoordinator {
   private void onShutDown() {
 
     try {
-      partitionMonitor.stop();
+      partitionMonitor.ifPresent(monitor -> monitor.stop());
+      inputStreamRegexMonitor.ifPresent(monitor -> monitor.stop());
       systemAdmins.stop();
       containerProcessManager.stop();
       coordinatorStreamManager.stop();
@@ -289,26 +304,63 @@ public class ClusterBasedJobCoordinator {
     }
   }
 
-  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
+  private Optional<StreamPartitionCountMonitor> getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");
     }
 
-    return new StreamPartitionCountMonitor(
-        inputStreamsToMonitor,
-        streamMetadata,
-        metrics,
-        new JobConfig(config).getMonitorPartitionChangeFrequency(),
-        streamsChanged -> {
-        // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
+    return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor, streamMetadata, metrics,
+        new JobConfig(config).getMonitorPartitionChangeFrequency(), streamsChanged -> {
+      // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
         if (hasDurableStores) {
           log.error("Input topic partition count changed in a job with durable state. Failing the job.");
           state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
         }
         coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
-      });
+      }));
+  }
+
+  private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, SystemAdmins systemAdmins) {
+
+    // if input regex monitor is not enabled return empty
+    if (new JobConfig(config).getMonitorRegexEnabled()) {
+      log.info("StreamRegexMonitor is disabled.");
+      return Optional.empty();
+    }
+
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+    if (inputStreamsToMonitor.isEmpty()) {
+      throw new SamzaException("Input streams to a job can not be empty.");
+    }
+
+    // First list all rewriters
+    Option<String> rewritersList = new JobConfig(config).getConfigRewriters();
+
+    // if no rewriter is defined, there is nothing to monitor
+    if (!rewritersList.isDefined()) {
+      log.warn("No config rewriters are defined. No StreamRegexMonitor created.");
+      return Optional.empty();
+    }
+
+    // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+    Map<String, Pattern> inputRegexesToMonitor =
+        JavaConverters.mapAsJavaMapConverter(new JobConfig(config).getMonitorRegexPatternMap(rewritersList.get()))
+            .asJava();
+
+    return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor, streamMetadata, metrics,
+        new JobConfig(config).getMonitorRegexFrequency(), new StreamRegexMonitor.Callback() {
+          @Override
+          public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+              Map<String, Pattern> regexesMonitored) {
+            log.error("New input system-streams discovered. Failing the job. New input streams: {}", newInputStreams,
+                " Existing input streams:", inputStreamsToMonitor);
+            state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+            coordinatorException = new InputStreamsDiscoveredException("New input streams added: " + newInputStreams);
+          }
+        }));
   }
 
   // The following two methods are package-private and for testing only
@@ -321,10 +373,9 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   StreamPartitionCountMonitor getPartitionMonitor() {
-    return partitionMonitor;
+    return partitionMonitor.get();
   }
 
-
   /**
    * The entry point for the {@link ClusterBasedJobCoordinator}
    * @param args args
@@ -335,12 +386,14 @@ public class ClusterBasedJobCoordinator {
     try {
       //Read and parse the coordinator system config.
       log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-      coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
+      coordinatorSystemConfig =
+          new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
     } catch (IOException e) {
       log.error("Exception while reading coordinator stream config {}", e);
       throw new SamzaException(e);
     }
     ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
     jc.run();
+    log.info("Finished ClusterBasedJobCoordinator run");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
new file mode 100644
index 0000000..6e85f43
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Exception to indicate that the new input streams have been added.
+ */
+public class InputStreamsDiscoveredException extends SamzaException {
+
+  public InputStreamsDiscoveredException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
new file mode 100644
index 0000000..4594307
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+  * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
+  */
+public class PartitionChangeException extends SamzaException {
+
+  public PartitionChangeException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
new file mode 100644
index 0000000..3c86bfb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * A single-thread based monitor that periodically monitors the given set of stream regexes, and matches them to
+ * the given set of streams. If a stream matching a given regex that is not in the corresponding stream set is detected,
+ * it invokes a {@link StreamRegexMonitor.Callback} with the initial input set, the new input stream set, and the regexes
+ * being monitored.
+ */
+public class StreamRegexMonitor {
+  private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class);
+
+  // Factory of daemon-threads to create the single threaded executor pool
+  private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true)
+      .setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName())
+      .build();
+
+  // Enum to describe the state of the regexMonitor
+  private enum State {
+    INIT, RUNNING, STOPPED
+  }
+
+  private final Set<SystemStream> streamsToMonitor;
+  private final Map<String, Pattern> systemRegexesToMonitor;
+  private final StreamMetadataCache metadataCache;
+  private final int inputRegexMonitorPeriodMs;
+
+  // Map of gauges (one per system), emitted when new input stream for that system is detected
+  private final Map<String, Gauge<Integer>> gauges;
+
+  private final Callback callbackMethod;
+
+  // Used to guard write access to state.
+  private final Object lock = new Object();
+
+  private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+  private volatile State state = State.INIT;
+
+  /**
+   * A callback that is invoked when the {@link StreamRegexMonitor} detects a new input stream matching given regex.
+   */
+  public interface Callback {
+    /**
+     * Method to be called when new input streams are detected.
+     * @param initialInputSet The initial set of input streams
+     * @param newInputStreams The set of new input streams discovered
+     * @param regexesMonitored The set of regexes being monitored
+     */
+    void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+        Map<String, Pattern> regexesMonitored);
+  }
+
+  /**
+   * Default constructor.
+   *
+   *  @param streamsToMonitor  a set of SystemStreams to monitor
+   * @param systemRegexesToMonitor  map of regexes for each input system
+   * @param metadataCache     the metadata cache which will be used to fetch metadata for partition counts.
+   * @param metrics           the metrics registry to which the metrics should be added.
+   * @param inputRegexMonitorPeriodMs the period at which the monitor will check each input-regex
+   * @param monitorCallback   the callback method to be invoked when new input stream matching regex is detected
+   */
+  public StreamRegexMonitor(Set<SystemStream> streamsToMonitor, Map<String, Pattern> systemRegexesToMonitor,
+      StreamMetadataCache metadataCache, MetricsRegistry metrics, int inputRegexMonitorPeriodMs,
+      Callback monitorCallback) {
+    this.streamsToMonitor = streamsToMonitor;
+    this.systemRegexesToMonitor = systemRegexesToMonitor;
+    this.metadataCache = metadataCache;
+    this.callbackMethod = monitorCallback;
+    this.inputRegexMonitorPeriodMs = inputRegexMonitorPeriodMs;
+
+    // Pre-populate the gauges
+    Map<String, Gauge<Integer>> mutableGauges = new HashMap<>();
+    for (String systemToMonitor : systemRegexesToMonitor.keySet()) {
+      Gauge gauge = metrics.newGauge("job-coordinator", String.format("%s-new-input-streams", systemToMonitor), 0);
+      mutableGauges.put(systemToMonitor, gauge);
+    }
+    gauges = Collections.unmodifiableMap(mutableGauges);
+
+    log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", this.getClass().getName(),
+        this.inputRegexMonitorPeriodMs, this.systemRegexesToMonitor);
+  }
+
+  /**
+   * Starts the monitor.
+   */
+  public void start() {
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          if (inputRegexMonitorPeriodMs > 0) {
+            schedulerService.scheduleAtFixedRate(new Runnable() {
+              @Override
+              public void run() {
+                monitorInputRegexes();
+              }
+            }, 0, inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS);
+          }
+          state = State.RUNNING;
+          break;
+
+        case RUNNING:
+          // start is idempotent
+          return;
+
+        case STOPPED:
+          throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted.");
+      }
+    }
+  }
+
+  /**
+   * Stops the monitor. Once it stops, it cannot be restarted.
+   */
+  public void stop() {
+    synchronized (lock) {
+      // We could also wait for full termination of the scheduler service, but it is overkill for
+      // our use case.
+      schedulerService.shutdownNow();
+
+      state = State.STOPPED;
+    }
+  }
+
+  private void monitorInputRegexes() {
+    log.debug("Running monitorInputRegexes");
+
+    try {
+      // obtain the list of SysStreams that match given patterns for all systems
+      Set<SystemStream> inputStreamsMatchingPattern = new HashSet<>();
+
+      // For each input system, for which we have a regex to monitor
+      for (String systemName : this.systemRegexesToMonitor.keySet()) {
+
+        try {
+          // obtain the list of SysStreams that match the regex for this system
+          // using the systemAdmin in the metadataCache
+          inputStreamsMatchingPattern.addAll(
+              JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(systemName))
+                  .asJava()
+                  .stream()
+                  .filter(x -> x.getStream().matches(this.systemRegexesToMonitor.get(systemName).pattern()))
+                  .collect(Collectors.toSet()));
+        } catch (UnsupportedOperationException e) {
+          log.error("UnsupportedOperationException while monitoring input regexes for system {}", systemName, e);
+        }
+      }
+
+      // if there is a stream that is in the input-Set but not in the streamsToMonitor
+      // since streamsToMonitor = task.inputs
+      if (!streamsToMonitor.containsAll(inputStreamsMatchingPattern)) {
+        log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ",
+            inputStreamsMatchingPattern, streamsToMonitor);
+
+        // invoke notify callback with new input streams
+        this.callbackMethod.onInputStreamsChanged(streamsToMonitor,
+            Sets.difference(inputStreamsMatchingPattern, streamsToMonitor), systemRegexesToMonitor);
+      } else {
+        log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}",
+            streamsToMonitor, inputStreamsMatchingPattern);
+      }
+    } catch (Exception e) {
+      log.error("Exception while monitoring input regexes.", e);
+    }
+  }
+
+  @VisibleForTesting
+  boolean isRunning() {
+    return state == State.RUNNING;
+  }
+
+  /**
+   * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+   * and false otherwise.
+   * <p>
+   * This is currently exposed at the package private level for tests only.
+   */
+  @VisibleForTesting
+  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return schedulerService.awaitTermination(timeout, unit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 2bc6420..5363e72 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -21,12 +21,16 @@ package org.apache.samza.config
 
 
 import java.io.File
+import java.util.regex.Pattern
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
 
+import scala.collection.mutable
+
+
 object JobConfig {
   // job config constants
   val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class
@@ -77,6 +81,15 @@ object JobConfig {
   val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
   val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
   val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms"
+
+  val MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms"
+  val DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000
+
+  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
@@ -127,7 +140,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getCoordinatorSystemName = {
     val system = getCoordinatorSystemNameOrNull
     if (system == null) {
-      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")
+      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution." + config)
     }
     system
   }
@@ -158,10 +171,44 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
+  // StreamRegexMonitor is disabled if the MonitorRegexFRequency is <= 0
+  def getMonitorRegexEnabled = (getMonitorRegexFrequency <= 0)
+
   def getMonitorPartitionChangeFrequency = getInt(
     JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
     JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
 
+  def getMonitorRegexFrequency = getInt(
+    JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS,
+    JobConfig.DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS)
+
+  def getMonitorRegexPatternMap(rewritersList : String) : mutable.HashMap[String, Pattern] = {
+    // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+    val inputRegexesToMonitor: mutable.HashMap[String, Pattern] = mutable.HashMap[String, Pattern]()
+    val rewriters: Array[String] = rewritersList.split(",")
+    // iterate over each rewriter and obtain the system and regex for it
+    for (rewriterName <- rewriters) {
+      val rewriterSystem: Option[String] = new JobConfig(config).getRegexResolvedSystem(rewriterName)
+      val rewriterRegex: Option[String] = new JobConfig(config).getRegexResolvedStreams(rewriterName)
+      if (rewriterSystem.isDefined && rewriterRegex.isDefined) {
+        var patternForSystem: Option[Pattern] = inputRegexesToMonitor.get(rewriterSystem.get)
+        patternForSystem =
+          if (patternForSystem == None) Some(Pattern.compile(rewriterRegex.get))
+          else
+            Some(Pattern.compile(String.join("|", patternForSystem.get.pattern(), rewriterRegex.get)))
+        inputRegexesToMonitor.put(rewriterSystem.get, patternForSystem.get)
+      }
+    }
+    inputRegexesToMonitor
+  }
+
+  // regex-related config methods duplicated from KafkaConfig to avoid module dependency
+  def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
+  def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
+
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index edffac7..abd6942 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -19,9 +19,11 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.util.{Logging, Clock, SystemClock}
+import org.apache.samza.util.{Clock, Logging, SystemClock}
 import org.apache.samza.SamzaException
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
@@ -84,6 +86,16 @@ class StreamMetadataCache (
   }
 
   /**
+    * Returns the list of System Streams for this system.
+    * @param systemName
+    * @param pattern
+    */
+  def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
+    val systemAdmin = systemAdmins.getSystemAdmin(systemName)
+    systemAdmin.getAllSystemStreams().asScala
+  }
+
+  /**
    * Returns metadata about the given streams. If the metadata isn't in the cache, it is retrieved from the systems
    * using the given SystemAdmins.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
new file mode 100644
index 0000000..84b026b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.collection.JavaConversions$;
+
+
+public class TestInputRegexMonitor {
+
+  private StreamRegexMonitor streamRegexMonitor;
+  private CountDownLatch callbackCount;
+
+  private int inputRegexMs = 10;
+  private String systemName = "kafka";
+  private int expectedNumberOfCallbacks = 1;
+  private Set<SystemStream> inputStreamsDiscovered;
+  private final SystemStream sampleStream = new SystemStream(systemName, "test-1");
+
+  @Before
+  public void setUp() {
+
+    inputStreamsDiscovered = new HashSet<>();
+    Map<String, Pattern> patternMap = new HashMap<>();
+    patternMap.put(systemName, Pattern.compile("test-.*"));
+
+    StreamMetadataCache mockStreamMetadataCache = new MockStreamMetadataCache(null, 1, null);
+
+    MetricsRegistry metrics = Mockito.mock(MetricsRegistry.class);
+    this.callbackCount = new CountDownLatch(expectedNumberOfCallbacks);
+
+    // Creating an streamRegexMonitor with empty-input set and test-.* regex input
+    this.streamRegexMonitor =
+        new StreamRegexMonitor(new HashSet<>(), patternMap, mockStreamMetadataCache, metrics, inputRegexMs,
+            new StreamRegexMonitor.Callback() {
+        @Override
+        public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+            Map<String, Pattern> regexesMonitored) {
+          callbackCount.countDown();
+          inputStreamsDiscovered.addAll(newInputStreams);
+
+          // Check that the newInputStream discovered is "kafka" "Test-1"
+          Assert.assertTrue(inputStreamsDiscovered.size() == 1);
+          Assert.assertTrue(inputStreamsDiscovered.contains(sampleStream));
+        }
+      });
+  }
+
+  @Test
+  public void testStartStop() throws InterruptedException {
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+
+    // Normal start
+    streamRegexMonitor.start();
+    Assert.assertTrue(streamRegexMonitor.isRunning());
+
+    // Start ought to be idempotent
+    streamRegexMonitor.start();
+    Assert.assertTrue(streamRegexMonitor.isRunning());
+
+    // Normal stop
+    streamRegexMonitor.stop();
+    Assert.assertTrue(streamRegexMonitor.awaitTermination(1, TimeUnit.SECONDS));
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+
+    try {
+      streamRegexMonitor.start();
+    } catch (Exception e) {
+      Assert.assertTrue(e.getClass().equals(IllegalStateException.class));
+    }
+
+    // Stop ought to be idempotent
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+    streamRegexMonitor.stop();
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+  }
+
+  @Test
+  public void testSchedulingAndInputAddition() throws Exception {
+    this.streamRegexMonitor.start();
+    try {
+      if (!callbackCount.await(1, TimeUnit.SECONDS)) {
+        throw new Exception(
+            "Did not see " + expectedNumberOfCallbacks + " callbacks after waiting. " + callbackCount.toString());
+      }
+    } finally {
+      System.out.println("CallbackCount is " + callbackCount.getCount());
+      this.streamRegexMonitor.stop();
+    }
+  }
+
+  private class MockStreamMetadataCache extends StreamMetadataCache {
+
+    public MockStreamMetadataCache(SystemAdmins systemAdmins, int cacheTTLms, Clock clock) {
+      super(systemAdmins, cacheTTLms, clock);
+    }
+
+    @Override
+    public scala.collection.mutable.Set getAllSystemStreams(String systemName) {
+      Set<SystemStream> s = new HashSet<>();
+      return JavaConversions$.MODULE$.asScalaSet(new HashSet<SystemStream>(Arrays.asList(sampleStream)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index 2aafab1..0cc7a90 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -20,7 +20,8 @@
 package org.apache.samza.coordinator
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.HashMap
+import java.util.regex.Pattern
+
 import org.apache.samza.Partition
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,6 +35,7 @@ import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable.HashMap
 
 
 class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@@ -43,6 +45,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val mockMetadataCache = mock[StreamMetadataCache]
     val inputSystemStream = new SystemStream("test-system", "test-stream")
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+    val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
 
     val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
@@ -209,6 +212,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
   def testScheduler(): Unit = {
     val mockMetadataCache = new MockStreamMetadataCache
     val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
     val sampleCount = new CountDownLatch(2); // Verify 2 invocations
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 596b07a..c3c66c7 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -54,6 +54,7 @@ import org.apache.samza.config.SystemConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.ExponentialSleepStrategy;
@@ -679,6 +680,13 @@ public class KafkaSystemAdmin implements SystemAdmin {
     return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
   }
 
+  @Override
+  public Set<SystemStream> getAllSystemStreams() {
+    return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
+        .map(x -> new SystemStream(systemName, x))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Container for metadata about offsets.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1954ac7..607feb0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,9 +40,6 @@ object KafkaConfig {
   val TOPIC_REPLICATION_FACTOR = "replication.factor"
   val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
 
-  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
-  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
-  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
   val SEGMENT_BYTES = "segment.bytes"
 
@@ -206,11 +203,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   }
 
   // regex resolver
-  def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+  def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
 
-  def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+  def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
 
-  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
 
   /**
     * Gets the replication factor for the changelog topics. Uses the following precedence.

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index e6068b0..654354b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -21,7 +21,8 @@ package org.apache.samza.config
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS}
+import org.apache.samza.config.KafkaConfig.{Config2Kafka}
+import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS}
 import org.apache.samza.SamzaException
 import org.apache.samza.util.{Logging, StreamUtil}
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4dbc2c6d/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 69d7da6..8b19292 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -24,7 +24,7 @@ import collection.JavaConverters._
 import org.junit.Assert._
 import org.junit.Test
 
-import KafkaConfig._
+import JobConfig._
 
 class TestRegExTopicGenerator {