You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/05 16:34:39 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-838] Fix Ivy-based ConfigStoreUtils and add Unit Test

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8903ebf  [GOBBLIN-838] Fix Ivy-based ConfigStoreUtils and add Unit Test
8903ebf is described below

commit 8903ebf3807af3369839069e2082afa70c7fe77e
Author: autumnust <le...@linkedin.com>
AuthorDate: Mon Aug 5 09:34:31 2019 -0700

    [GOBBLIN-838] Fix Ivy-based ConfigStoreUtils and add Unit Test
    
    Closes #2697 from
    autumnust/removeConfigStoreInKafkaSource
---
 .../gobblin/config/client/ConfigClientUtils.java   |  20 ++-
 .../config/store/zip/IvyConfigStoreFactory.java    |  40 ++----
 .../zip/SimpleLocalIvyConfigStoreFactory.java      | 120 ++++++++++++++++
 .../config/store/zip/ZipFileConfigStore.java       |  11 +-
 ...che.gobblin.config.store.api.ConfigStoreFactory |   1 +
 .../config/store/zip/ZipFileConfigStoreTest.java   |  12 ++
 .../src/test/resources/zipStoreTest.zip            | Bin 1821 -> 2990 bytes
 .../extractor/extract/kafka/ConfigStoreUtils.java  | 120 ++++++++++------
 .../extractor/extract/kafka/KafkaSource.java       |  14 +-
 .../extract/kafka/ConfigStoreUtilsTest.java        | 160 +++++++++++++++++++++
 .../extract/kafka/ZipConfigStoreUtilsTest.java     | 139 ++++++++++++++++++
 .../src/test/resources/IvyConfigStoreTest.zip      | Bin 0 -> 8389 bytes
 .../resources/_CONFIG_STORE/store-metadata.conf    |   1 +
 .../v1.0/data/tracking/Topic1/includes.conf        |   5 +
 .../v1.0/data/tracking/Topic1/main.conf            |   1 +
 .../v1.0/data/tracking/Topic2/includes.conf        |   2 +
 .../_CONFIG_STORE/v1.0/tags/blacklist/main.conf    |   1 +
 .../_CONFIG_STORE/v1.0/tags/whitelist/main.conf    |   1 +
 18 files changed, 548 insertions(+), 100 deletions(-)

diff --git a/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java b/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
index 4eb3231..e5d8e1f 100644
--- a/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
+++ b/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
@@ -25,17 +25,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
 import org.apache.gobblin.config.common.impl.SingleLinkedListConfigKeyPath;
 import org.apache.gobblin.config.store.api.ConfigKeyPath;
 import org.apache.gobblin.config.store.api.ConfigStore;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 
 
 /**
@@ -69,17 +68,16 @@ public class ConfigClientUtils {
    * @return              : return the URI of the same format with the input configKeyURI
    *
    * for example, configKeyPath is /tags/retention,
-   * with returnURIWithAuthority as true, return "etl-hdfs:///tags/retention
-   * with returnURIWithAuthority as false , then return
+   * with returnURIWithAuthority as false, return "etl-hdfs:///tags/retention
+   * with returnURIWithAuthority as true , then return
    * etl-hdfs://eat1-nertznn01.grid.linkedin.com:9000/user/mitu/HdfsBasedConfigTest/tags/retention
    */
-  public static URI buildUriInClientFormat(ConfigKeyPath configKeyPath, ConfigStore cs,
-      boolean returnURIWithAuthority) {
-
+  public static URI buildUriInClientFormat(ConfigKeyPath configKeyPath, ConfigStore cs, boolean returnURIWithAuthority) {
     try {
       if (!returnURIWithAuthority) {
         return new URI(cs.getStoreURI().getScheme(), null, configKeyPath.getAbsolutePathString(), null, null);
       }
+
       URI storeRoot = cs.getStoreURI();
       // if configKeyPath is root, the configKeyPath.getAbsolutePathString().substring(1) will return "" and
       // will cause the Path creation failure if not handled here
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
index 4b7286a..2c99c59 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
@@ -25,19 +25,16 @@ import java.nio.file.FileSystems;
 import java.nio.file.Paths;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.utils.URLEncodedUtils;
-
-import com.sun.nio.zipfs.ZipFileSystem;
-
 import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
 import org.apache.gobblin.config.store.api.ConfigStoreFactory;
 import org.apache.gobblin.config.store.hdfs.SimpleHDFSConfigStoreFactory;
 import org.apache.gobblin.config.store.hdfs.SimpleHDFSStoreMetadata;
 import org.apache.gobblin.config.store.hdfs.SimpleHadoopFilesystemConfigStore;
 import org.apache.gobblin.util.DownloadUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.sun.nio.zipfs.ZipFileSystem;
 
 
 /**
@@ -46,23 +43,20 @@ import org.apache.gobblin.util.DownloadUtils;
  *
  * An ivy settings file must be present on the classpath named {@link DownloadUtils#IVY_SETTINGS_FILE_NAME}
  */
-public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigStore> {
+public class IvyConfigStoreFactory extends SimpleLocalIvyConfigStoreFactory {
 
-  private static final String IVY_SCHEME_PREFIX = "ivy-";
-  private static final String ORG_KEY = "org";
-  private static final String MODULE_KEY = "module";
-  private static final String STORE_PATH_KEY = "storePath";
-  private static final String STORE_PREFIX_KEY = "storePrefix";
+  /**
+   * Ivy coordinates required for downloading jar file.
+   */
+  protected static final String ORG_KEY = "org";
+  protected static final String MODULE_KEY = "module";
+  protected static final String STORE_PATH_KEY = "storePath";
 
   @Override
   public String getScheme() {
     return getSchemePrefix() + SimpleHDFSConfigStoreFactory.HDFS_SCHEME_NAME;
   }
 
-  public String getSchemePrefix() {
-    return IVY_SCHEME_PREFIX;
-  }
-
   /**
    * Example configKey URI (configuration is passed as part of the query)
    *
@@ -80,10 +74,7 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt
       throw new ConfigStoreCreationException(configKey, "Config key URI must have scheme " + getScheme());
     }
 
-    Properties factoryProps = new Properties();
-    for (NameValuePair param : URLEncodedUtils.parse(configKey, "UTF-8")) {
-      factoryProps.setProperty(param.getName(), param.getValue());
-    }
+    Properties factoryProps = parseUriIntoParameterSet(configKey);
 
     String jarOrg = factoryProps.getProperty(ORG_KEY);
     String jarModule = factoryProps.getProperty(MODULE_KEY);
@@ -116,12 +107,5 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt
       throw new ConfigStoreCreationException(configKey, e);
     }
   }
-
-  /**
-   * Base URI for a config store should be root of the zip file, so change path part of URI to be null
-   */
-  private URI getBaseURI(URI configKey) throws URISyntaxException {
-    return new URI(configKey.getScheme(), configKey.getAuthority(), null, configKey.getQuery(), configKey.getFragment());
-  }
 }
 
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java
new file mode 100644
index 0000000..a733dd2
--- /dev/null
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.config.store.zip;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.util.Properties;
+
+import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
+import org.apache.gobblin.config.store.api.ConfigStoreFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.sun.nio.zipfs.ZipFileSystem;
+
+
+/**
+ * An implementation of {@link ConfigStoreFactory} that takes a locally-existed zip as the backend of Config-store
+ * and creates a {@link ZipFileConfigStore} with it.
+ *
+ * {@link ZipFileConfigStore} has more advantage on encapsulating Config-store itself comparing to
+ * {@link org.apache.gobblin.config.store.hdfs.SimpleHadoopFilesystemConfigStore}, where the latter could, for example,
+ * cause small-file problem on HDFS as the size of Config-Store grows.
+ */
+public class SimpleLocalIvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigStore> {
+
+  private String currentVersion;
+
+  private static ThreadLocal<FileSystem> THREADLOCAL_FS = new ThreadLocal<>();
+
+  static final String STORE_PREFIX_KEY = "storePrefix";
+  static final String IVY_SCHEME_PREFIX = "ivy-";
+
+  /**
+   * If not specified an version, assigned with an default version since the primary usage of this class
+   * is for testing.
+   */
+  public SimpleLocalIvyConfigStoreFactory() {
+    this.currentVersion = "v1.0";
+  }
+
+  public SimpleLocalIvyConfigStoreFactory(String configStoreVersion) {
+    this.currentVersion = configStoreVersion;
+  }
+
+  @Override
+  public String getScheme() {
+    return getSchemePrefix() + "file";
+  }
+
+  protected String getSchemePrefix() {
+    return IVY_SCHEME_PREFIX;
+  }
+
+  /**
+   *
+   * @param configKey whose path contains the physical path to the zip file.
+   * @return
+   * @throws ConfigStoreCreationException
+   */
+  @Override
+  public ZipFileConfigStore createConfigStore(URI configKey)
+      throws ConfigStoreCreationException {
+    Properties factoryProps = parseUriIntoParameterSet(configKey);
+
+    try {
+      // Construct URI as jar for zip file, as "jar" is the scheme for ZipFs.
+      URI uri = new URI("jar:file", null, new Path(factoryProps.getProperty("storePath")).toString(), null);
+
+      /** Using threadLocal to avoid {@link java.nio.file.FileSystemAlreadyExistsException} */
+      if (THREADLOCAL_FS.get() == null) {
+        FileSystem zipFs = FileSystems.newFileSystem(uri, ImmutableMap.of());
+        THREADLOCAL_FS.set(zipFs);
+      }
+
+      return new ZipFileConfigStore((ZipFileSystem) THREADLOCAL_FS.get(), getBaseURI(configKey), currentVersion,
+          factoryProps.getProperty(STORE_PREFIX_KEY, ""));
+    } catch (URISyntaxException | IOException e) {
+      throw new RuntimeException("Unable to load zip from classpath. ", e);
+    }
+  }
+
+  /**
+   * Parse the configKey and obtain the parameters set required to ivy coordinates.
+   */
+  Properties parseUriIntoParameterSet(URI configKey) {
+    Properties factoryProps = new Properties();
+    for (NameValuePair param : URLEncodedUtils.parse(configKey, "UTF-8")) {
+      factoryProps.setProperty(param.getName(), param.getValue());
+    }
+    return factoryProps;
+  }
+
+  /**
+   * Base URI for a config store should be root of the zip file, so change path part of URI to be null
+   */
+  URI getBaseURI(URI configKey) throws URISyntaxException {
+    return new URI(configKey.getScheme(), configKey.getAuthority(), "/", configKey.getQuery(), configKey.getFragment());
+  }
+}
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
index 5475b49..711a836 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang.StringUtils;
@@ -102,7 +103,7 @@ public class ZipFileConfigStore implements ConfigStore {
     Preconditions.checkArgument(version.equals(getCurrentVersion()));
 
     List<ConfigKeyPath> children = new ArrayList<>();
-    Path datasetDir = getDatasetDirForKey(configKey);
+    Path datasetDir = getDatasetDirForKey(configKey, version);
 
     try {
 
@@ -142,7 +143,7 @@ public class ZipFileConfigStore implements ConfigStore {
     Preconditions.checkArgument(version.equals(getCurrentVersion()));
 
     List<ConfigKeyPath> configKeyPaths = new ArrayList<>();
-    Path datasetDir = getDatasetDirForKey(configKey);
+    Path datasetDir = getDatasetDirForKey(configKey, version);
     Path includesFile = this.fs.getPath(datasetDir.toString(), SimpleHadoopFilesystemConfigStore.INCLUDES_CONF_FILE_NAME);
 
     try {
@@ -171,7 +172,7 @@ public class ZipFileConfigStore implements ConfigStore {
     Preconditions.checkNotNull(configKey, "configKey cannot be null!");
     Preconditions.checkArgument(version.equals(getCurrentVersion()));
 
-    Path datasetDir = getDatasetDirForKey(configKey);
+    Path datasetDir = getDatasetDirForKey(configKey, version);
     Path mainConfFile = this.fs.getPath(datasetDir.toString(), SimpleHadoopFilesystemConfigStore.MAIN_CONF_FILE_NAME);
 
     try {
@@ -193,7 +194,7 @@ public class ZipFileConfigStore implements ConfigStore {
   /**
    * Get path object using zipped file system and relative path
    */
-  private Path getDatasetDirForKey(ConfigKeyPath configKey) throws VersionDoesNotExistException {
-    return this.fs.getPath(this.storePrefix, configKey.getAbsolutePathString());
+  private Path getDatasetDirForKey(ConfigKeyPath configKey, String version) throws VersionDoesNotExistException {
+    return this.fs.getPath(this.storePrefix, version, configKey.getAbsolutePathString());
   }
 }
diff --git a/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory b/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
index 69865c1..21d3e81 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
+++ b/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
@@ -19,3 +19,4 @@ org.apache.gobblin.config.store.hdfs.SimpleHDFSConfigStoreFactory
 org.apache.gobblin.config.store.hdfs.SimpleLocalHDFSConfigStoreFactory
 org.apache.gobblin.config.store.hdfs.DefaultCapableLocalConfigStoreFactory
 org.apache.gobblin.config.store.zip.IvyConfigStoreFactory
+org.apache.gobblin.config.store.zip.SimpleLocalIvyConfigStoreFactory
diff --git a/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java b/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
index 70ff132..12f69ca 100644
--- a/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
+++ b/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
@@ -49,6 +49,18 @@ public class ZipFileConfigStoreTest {
   private ConfigKeyPath child1Path = testPath.createChild("child1");
   private ConfigKeyPath child2Path = testPath.createChild("child2");
 
+  /**
+   * Layout of testing config store:
+   * /_CONFIG_STORE
+   *    /testVersion
+   *        /test
+   *            /child1
+   *                main.conf (gobblin.test.property = "string2")
+   *                includes.conf (test/child1)
+   *            /child2
+   *                main.conf (gobblin.test.property = "string3")
+   *        main.conf
+   */
   @BeforeClass
   public void setUp() throws URISyntaxException, ConfigStoreCreationException, IOException {
     Path path = Paths.get(this.getClass().getClassLoader().getResource("zipStoreTest.zip").getPath());
diff --git a/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip b/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip
index ffe9603..6f8896d 100644
Binary files a/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip and b/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip differ
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index a05438c..6346d64 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -16,9 +16,16 @@
  */
 package org.apache.gobblin.source.extractor.extract.kafka;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.config.client.ConfigClient;
 import org.apache.gobblin.config.client.ConfigClientUtils;
 import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -30,23 +37,23 @@ import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DatasetFilterUtils;
 import org.apache.gobblin.util.PathUtils;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
 
 @Slf4j
 public class ConfigStoreUtils {
+  /**
+   * Used as the grouping functionality in config-store to filter certain data nodes.
+   */
   public static final String GOBBLIN_CONFIG_TAGS_WHITELIST = "gobblin.config.tags.whitelist";
   public static final String GOBBLIN_CONFIG_TAGS_BLACKLIST = "gobblin.config.tags.blacklist";
+
   public static final String GOBBLIN_CONFIG_FILTER = "gobblin.config.filter";
   public static final String GOBBLIN_CONFIG_COMMONPATH = "gobblin.config.commonPath";
 
@@ -56,9 +63,9 @@ public class ConfigStoreUtils {
   public static Collection<URI> getTopicsURIFromConfigStore(ConfigClient configClient, Path tagUri, String filterString,
       Optional<Config> runtimeConfig) {
     try {
-      Collection<URI> importedBy = configClient.getImportedBy(new URI(tagUri.toString()), true, runtimeConfig);
+      Collection<URI> importedBy = configClient.getImportedBy(tagUri.toUri(), true, runtimeConfig);
       return importedBy.stream().filter((URI u) -> u.toString().contains(filterString)).collect(Collectors.toList());
-    } catch (URISyntaxException | ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException e) {
+    } catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException e) {
       throw new Error(e);
     }
   }
@@ -81,9 +88,8 @@ public class ConfigStoreUtils {
 
   public static URI getUriStringForTopic(String topicName, String commonPath, String configStoreUri)
       throws URISyntaxException {
-    URI storeUri = new URI(configStoreUri);
-    Path path = PathUtils.mergePaths(new Path(storeUri.getPath()), PathUtils.mergePaths(new Path(commonPath), new Path(topicName)));
-    URI topicUri = new URI(storeUri.getScheme(), storeUri.getAuthority(), path.toString(), storeUri.getQuery(), storeUri.getFragment());
+    Path fullTopicPathInConfigStore = PathUtils.mergePaths(new Path(commonPath), new Path(topicName));
+    URI topicUri = getUriFromPath(fullTopicPathInConfigStore, configStoreUri);
     log.info("URI for topic is : " + topicUri.toString());
     return topicUri;
   }
@@ -124,10 +130,8 @@ public class ConfigStoreUtils {
   /**
    * Get topics from config store.
    * Topics will either be whitelisted or blacklisted using tag.
-   * After filtering out topics via tag, their config property is checked.
-   * For each shortlisted topic, config must contain either property topic.blacklist or topic.whitelist
    *
-   * If tags are not provided, it will return all topics
+   * If tags are not provided, it will return all topics.
    */
   public static List<KafkaTopic> getTopicsFromConfigStore(Properties properties, String configStoreUri,
       GobblinKafkaConsumerClient kafkaConsumerClient) {
@@ -141,33 +145,14 @@ public class ConfigStoreUtils {
     Optional<Config> runtimeConfig = ConfigClientUtils.getOptionalRuntimeConfig(properties);
 
     if (properties.containsKey(GOBBLIN_CONFIG_TAGS_WHITELIST)) {
-      Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
-          "Missing required property " + GOBBLIN_CONFIG_FILTER);
-      String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
-      Path whiteListTagUri = PathUtils.mergePaths(new Path(configStoreUri),
-          new Path(properties.getProperty(GOBBLIN_CONFIG_TAGS_WHITELIST)));
-      List<String> whitelistedTopics = new ArrayList<>();
-      ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, whiteListTagUri, filterString, runtimeConfig)
-          .stream()
-          .filter((URI u) -> ConfigUtils.getBoolean(ConfigStoreUtils.getConfig(configClient, u, runtimeConfig),
-              KafkaSource.TOPIC_WHITELIST, false))
-          .forEach(((URI u) -> whitelistedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
-
+      List<String> whitelistedTopics = getListOfTopicNamesByFilteringTag(properties, configClient, runtimeConfig,
+          configStoreUri, GOBBLIN_CONFIG_TAGS_WHITELIST);
       return allTopics.stream()
           .filter((KafkaTopic p) -> whitelistedTopics.contains(p.getName()))
           .collect(Collectors.toList());
     } else if (properties.containsKey(GOBBLIN_CONFIG_TAGS_BLACKLIST)) {
-      Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
-          "Missing required property " + GOBBLIN_CONFIG_FILTER);
-      String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
-      Path blackListTagUri = PathUtils.mergePaths(new Path(configStoreUri),
-          new Path(properties.getProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST)));
-      List<String> blacklistedTopics = new ArrayList<>();
-      ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, blackListTagUri, filterString, runtimeConfig)
-          .stream()
-          .filter((URI u) -> ConfigUtils.getBoolean(ConfigStoreUtils.getConfig(configClient, u, runtimeConfig),
-              KafkaSource.TOPIC_BLACKLIST, false))
-          .forEach(((URI u) -> blacklistedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
+      List<String> blacklistedTopics = getListOfTopicNamesByFilteringTag(properties, configClient, runtimeConfig,
+          configStoreUri, GOBBLIN_CONFIG_TAGS_BLACKLIST);
       return allTopics.stream()
           .filter((KafkaTopic p) -> !blacklistedTopics.contains(p.getName()))
           .collect(Collectors.toList());
@@ -178,11 +163,58 @@ public class ConfigStoreUtils {
   }
 
   /**
+   * Using the tag feature provided by Config-Store for grouping, getting a list of topics (case-sensitive,
+   * need to be matched with what would be returned from kafka broker) tagged by the tag value specified
+   * in job configuration.
+   */
+  public static List<String> getListOfTopicNamesByFilteringTag(Properties properties, ConfigClient configClient,
+      Optional<Config> runtimeConfig, String configStoreUri, String tagConfName) {
+    Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
+        "Missing required property " + GOBBLIN_CONFIG_FILTER);
+    String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
+
+    Path tagUri = new Path("/");
+    try {
+      tagUri = new Path(getUriFromPath(new Path(properties.getProperty(tagConfName)), configStoreUri));
+    } catch (URISyntaxException ue) {
+      log.error("Cannot construct a Tag URI due to the exception:", ue);
+    }
+
+    List<String> taggedTopics = new ArrayList<>();
+    ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, tagUri, filterString, runtimeConfig)
+        .forEach(((URI u) -> taggedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
+
+    return taggedTopics;
+  }
+
+  /**
+   * Construct the URI for a Config-Store node given a path.
+   * The implementation will be based on scheme, while the signature of this method will not be subject to
+   * different implementation.
+   *
+   * The implementation will be different since Fs-based config-store simply append dataNode's path in the end,
+   * while ivy-based config-store will require query to store those information.
+   *
+   * @param path The relative path of a node inside Config-Store.
+   * @param configStoreUri The config store URI.
+   * @return The URI to inspect a data node represented by path inside Config Store.
+   * @throws URISyntaxException
+   */
+  private static URI getUriFromPath(Path path, String configStoreUri) throws URISyntaxException {
+    URI storeUri = new URI(configStoreUri);
+    return new URI(storeUri.getScheme(), storeUri.getAuthority(),
+        PathUtils.mergePaths(new Path(storeUri.getPath()), path).toString(), storeUri.getQuery(), storeUri.getFragment());
+
+  }
+
+  /**
    * Shortlist topics from config store based on whitelist/blacklist tags and
    * add it to {@param whitelist}/{@param blacklist}
    *
    * If tags are not provided, blacklist and whitelist won't be modified
+   * @deprecated Since this method contains implementation-specific way to construct TagURI inside Config-Store.
    */
+  @Deprecated
   public static void setTopicsFromConfigStore(Properties properties, Set<String> blacklist, Set<String> whitelist,
       final String _blacklistTopicKey, final String _whitelistTopicKey) {
     Optional<String> configStoreUri = getConfigStoreUri(properties);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index cb0e2b5..f8c6f4a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -698,22 +698,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
   }
 
   /**
-   * If config store is enabled, then intersection of topics from blacklisting/whitelisting will be taken against
-   * the topics from config-store
+   * Return topics to be processed filtered by job-level whitelist and blacklist.
    */
   private List<KafkaTopic> getFilteredTopics(SourceState state) {
     List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
     List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
-    List<KafkaTopic> topics = this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
-    Optional<String> configStoreUri = ConfigStoreUtils.getConfigStoreUri(state.getProperties());
-    if (configStoreUri.isPresent()) {
-      List<KafkaTopic> topicsFromConfigStore = ConfigStoreUtils
-          .getTopicsFromConfigStore(state.getProperties(), configStoreUri.get(), this.kafkaConsumerClient.get());
-
-      return topics.stream().filter((KafkaTopic p) -> (topicsFromConfigStore.stream()
-          .anyMatch((KafkaTopic q) -> q.getName().equalsIgnoreCase(p.getName())))).collect(toList());
-    }
-    return topics;
+    return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
   }
 
   @Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
new file mode 100644
index 0000000..d296937
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_ENABLED;
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_COMMONPATH;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_FILTER;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_BLACKLIST;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_WHITELIST;
+import static org.mockito.Matchers.anyList;
+
+
+/**
+ * Added this testing to protect no behavior changes on {@link ConfigStoreUtils} after refactoring.
+ */
+public class ConfigStoreUtilsTest {
+
+  // Declare as string in convenience of testing.
+  private String configStoreUri;
+
+  private GobblinKafkaConsumerClient mockClient;
+
+  private ConfigClient configClient = ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
+
+  /**
+   * Loading a fs-based config-store for ease of unit testing.
+   * @throws Exception
+   */
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    URL url = this.getClass().getClassLoader().getResource("_CONFIG_STORE");
+    configStoreUri = getStoreURI(new Path(url.getPath()).getParent().toString()).toString();
+    mockClient = Mockito.mock(GobblinKafkaConsumerClient.class);
+  }
+
+  @Test
+  public void testGetUriStringForTopic() throws Exception {
+    String commonPath = "/data/tracking";
+    URI topic1URI = ConfigStoreUtils.getUriStringForTopic("Topic1", commonPath, configStoreUri);
+    URI expectedTopic1URI = new URI("simple-file", "", new URI(configStoreUri).getPath() + "/data/tracking/Topic1", null, null);
+    Assert.assertEquals(topic1URI, expectedTopic1URI);
+
+    URI topic2URI = ConfigStoreUtils.getUriStringForTopic("Topic2", commonPath, configStoreUri);
+    URI expectedTopic2URI = new URI("simple-file", "", new URI(configStoreUri).getPath() + "/data/tracking/Topic2", null, null);
+    Assert.assertEquals(topic2URI, expectedTopic2URI);
+  }
+
+  @Test
+  public void testGetConfigForTopic() throws Exception {
+    Properties properties = new Properties();
+    String commonPath = "/data/tracking";
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, commonPath);
+    properties.setProperty(CONFIG_MANAGEMENT_STORE_URI, configStoreUri);
+    properties.setProperty(CONFIG_MANAGEMENT_STORE_ENABLED, "true");
+    properties.setProperty("topic.name", "Topic1");
+
+    Config topic1Config = ConfigStoreUtils.getConfigForTopic(properties, "topic.name", configClient).get();
+    Assert.assertEquals(topic1Config.getString("aaaaa"), "bbbb");
+  }
+
+
+
+  @Test
+  public void testGetTopicsFromConfigStore()
+      throws Exception {
+    KafkaTopic topic1 = new KafkaTopic("Topic1", Lists.newArrayList());
+    KafkaTopic topic2 = new KafkaTopic("Topic2", Lists.newArrayList());
+    KafkaTopic topic3 = new KafkaTopic("Topic3", Lists.newArrayList());
+
+    Mockito.when(mockClient.getFilteredTopics(anyList(), anyList()))
+        .thenReturn(ImmutableList.of(topic1, topic2, topic3));
+    Properties properties = new Properties();
+
+    // Empty properties returns everything: topic1, 2 and 3.
+    List<KafkaTopic> result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 3);
+
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+    properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+    // Whitelist only two topics. Should only returned whitelisted topics.
+    result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 2);
+    List<String> resultInString = result.stream().map(KafkaTopic::getName).collect(Collectors.toList());
+    Assert.assertTrue(resultInString.contains("Topic1"));
+    Assert.assertTrue(resultInString.contains("Topic2"));
+
+    // Blacklist two topics. Should only return non-blacklisted topics.
+    properties.remove(GOBBLIN_CONFIG_TAGS_WHITELIST);
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST, "/tags/blacklist");
+    result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getName(), "Topic3");
+  }
+
+  @Test
+  public void testGetListOfTopicNamesByFilteringTag()
+      throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+    properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+    List<String> result = ConfigStoreUtils
+        .getListOfTopicNamesByFilteringTag(properties, configClient, Optional.absent(), configStoreUri,
+            GOBBLIN_CONFIG_TAGS_WHITELIST);
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertTrue(result.contains("Topic1"));
+    Assert.assertTrue(result.contains("Topic2"));
+  }
+
+  /**
+   * Return localFs-based config-store uri.
+   * Note that for local FS, fs.getUri will return an URI without authority. So we shouldn't add authority when
+   * we construct an URI for local-file backed config-store.
+   */
+  private URI getStoreURI(String configDir)
+      throws URISyntaxException {
+    return new URI("simple-file", "", configDir, null, null);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java
new file mode 100644
index 0000000..ebdd27b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
+import org.apache.gobblin.config.store.zip.SimpleLocalIvyConfigStoreFactory;
+import org.apache.gobblin.config.store.zip.ZipFileConfigStore;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_ENABLED;
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_COMMONPATH;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_FILTER;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_BLACKLIST;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_WHITELIST;
+import static org.mockito.Matchers.anyList;
+
+
+/**
+ * The same testing routine for ivy-based config-store (ZipConfigStore)
+ * Make sure everything inside {@link ConfigStoreUtils} will work for {@link ZipFileConfigStore} implementation.
+ */
+public class ZipConfigStoreUtilsTest {
+  private String configStoreUri;
+  private ConfigClient configClient = ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
+  private GobblinKafkaConsumerClient mockClient;
+
+  @BeforeClass
+  public void setUp()
+      throws URISyntaxException, ConfigStoreCreationException, IOException {
+    Path path =
+        Paths.get(ZipConfigStoreUtilsTest.class.getClassLoader().getResource("IvyConfigStoreTest.zip").getPath());
+    URI zipInClassPathURI = new URI(
+        "ivy-file:/?org=org&module=module&storePath=" + path
+            + "&storePrefix=_CONFIG_STORE");
+
+    ZipFileConfigStore store = new SimpleLocalIvyConfigStoreFactory().createConfigStore(zipInClassPathURI);
+    configStoreUri = store.getStoreURI().toString();
+    mockClient = Mockito.mock(GobblinKafkaConsumerClient.class);
+  }
+
+  @Test
+  public void testGetListOfTopicNamesByFilteringTag()
+      throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+    properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+    List<String> result = ConfigStoreUtils
+        .getListOfTopicNamesByFilteringTag(properties, configClient, Optional.absent(), configStoreUri,
+            GOBBLIN_CONFIG_TAGS_WHITELIST);
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertTrue(result.contains("Topic1"));
+    Assert.assertTrue(result.contains("Topic2"));
+  }
+
+  @Test
+  public void testGetTopicsFromConfigStore()
+      throws Exception {
+    KafkaTopic topic1 = new KafkaTopic("Topic1", Lists.newArrayList());
+    KafkaTopic topic2 = new KafkaTopic("Topic2", Lists.newArrayList());
+    KafkaTopic topic3 = new KafkaTopic("Topic3", Lists.newArrayList());
+
+    Mockito.when(mockClient.getFilteredTopics(anyList(), anyList()))
+        .thenReturn(ImmutableList.of(topic1, topic2, topic3));
+    Properties properties = new Properties();
+
+    // Empty properties returns everything: topic1, 2 and 3.
+    List<KafkaTopic> result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 3);
+
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+    properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+    // Whitelist only two topics. Should only returned whitelisted topics.
+    result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 2);
+    List<String> resultInString = result.stream().map(KafkaTopic::getName).collect(Collectors.toList());
+    Assert.assertTrue(resultInString.contains("Topic1"));
+    Assert.assertTrue(resultInString.contains("Topic2"));
+
+    // Blacklist two topics. Should only return non-blacklisted topics.
+    properties.remove(GOBBLIN_CONFIG_TAGS_WHITELIST);
+    properties.setProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST, "/tags/blacklist");
+    result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getName(), "Topic3");
+  }
+
+  @Test
+  public void testGetConfigForTopic() throws Exception {
+    Properties properties = new Properties();
+    String commonPath = "/data/tracking";
+    properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, commonPath);
+    properties.setProperty(CONFIG_MANAGEMENT_STORE_URI, configStoreUri);
+    properties.setProperty(CONFIG_MANAGEMENT_STORE_ENABLED, "true");
+    properties.setProperty("topic.name", "Topic1");
+
+    Config topic1Config = ConfigStoreUtils.getConfigForTopic(properties, "topic.name", configClient).get();
+    Assert.assertEquals(topic1Config.getString("aaaaa"), "bbbb");
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip b/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip
new file mode 100644
index 0000000..41fcac2
Binary files /dev/null and b/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip differ
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf
new file mode 100644
index 0000000..1ba8c7d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf
@@ -0,0 +1 @@
+{"config":{"hdfs":{"store":{"version":{"current":"v1.0"}}}}}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf
new file mode 100644
index 0000000..f91bbd1
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf
@@ -0,0 +1,5 @@
+# Tag this dataset with "whitelist"
+/tags/whitelist
+
+# Tag this dataset with "blacklist"
+/tags/blacklist
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf
new file mode 100644
index 0000000..985e7e2
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf
@@ -0,0 +1 @@
+aaaaa=bbbb
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf
new file mode 100644
index 0000000..9fad384
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf
@@ -0,0 +1,2 @@
+/tags/whitelist
+/tags/blacklist
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf
new file mode 100644
index 0000000..c946b57
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf
@@ -0,0 +1 @@
+topic.whitelist=true
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf
new file mode 100644
index 0000000..84adf18
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf
@@ -0,0 +1 @@
+topic.blacklist=true
\ No newline at end of file