You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/05 16:34:39 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-838] Fix
Ivy-based ConfigStoreUtils and add Unit Test
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8903ebf [GOBBLIN-838] Fix Ivy-based ConfigStoreUtils and add Unit Test
8903ebf is described below
commit 8903ebf3807af3369839069e2082afa70c7fe77e
Author: autumnust <le...@linkedin.com>
AuthorDate: Mon Aug 5 09:34:31 2019 -0700
[GOBBLIN-838] Fix Ivy-based ConfigStoreUtils and add Unit Test
Closes #2697 from
autumnust/removeConfigStoreInKafkaSource
---
.../gobblin/config/client/ConfigClientUtils.java | 20 ++-
.../config/store/zip/IvyConfigStoreFactory.java | 40 ++----
.../zip/SimpleLocalIvyConfigStoreFactory.java | 120 ++++++++++++++++
.../config/store/zip/ZipFileConfigStore.java | 11 +-
...che.gobblin.config.store.api.ConfigStoreFactory | 1 +
.../config/store/zip/ZipFileConfigStoreTest.java | 12 ++
.../src/test/resources/zipStoreTest.zip | Bin 1821 -> 2990 bytes
.../extractor/extract/kafka/ConfigStoreUtils.java | 120 ++++++++++------
.../extractor/extract/kafka/KafkaSource.java | 14 +-
.../extract/kafka/ConfigStoreUtilsTest.java | 160 +++++++++++++++++++++
.../extract/kafka/ZipConfigStoreUtilsTest.java | 139 ++++++++++++++++++
.../src/test/resources/IvyConfigStoreTest.zip | Bin 0 -> 8389 bytes
.../resources/_CONFIG_STORE/store-metadata.conf | 1 +
.../v1.0/data/tracking/Topic1/includes.conf | 5 +
.../v1.0/data/tracking/Topic1/main.conf | 1 +
.../v1.0/data/tracking/Topic2/includes.conf | 2 +
.../_CONFIG_STORE/v1.0/tags/blacklist/main.conf | 1 +
.../_CONFIG_STORE/v1.0/tags/whitelist/main.conf | 1 +
18 files changed, 548 insertions(+), 100 deletions(-)
diff --git a/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java b/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
index 4eb3231..e5d8e1f 100644
--- a/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
+++ b/gobblin-config-management/gobblin-config-client/src/main/java/org/apache/gobblin/config/client/ConfigClientUtils.java
@@ -25,17 +25,16 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
import org.apache.gobblin.config.common.impl.SingleLinkedListConfigKeyPath;
import org.apache.gobblin.config.store.api.ConfigKeyPath;
import org.apache.gobblin.config.store.api.ConfigStore;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
/**
@@ -69,17 +68,16 @@ public class ConfigClientUtils {
* @return : return the URI of the same format with the input configKeyURI
*
* for example, configKeyPath is /tags/retention,
- * with returnURIWithAuthority as true, return "etl-hdfs:///tags/retention
- * with returnURIWithAuthority as false , then return
+ * with returnURIWithAuthority as false, return "etl-hdfs:///tags/retention
+ * with returnURIWithAuthority as true , then return
* etl-hdfs://eat1-nertznn01.grid.linkedin.com:9000/user/mitu/HdfsBasedConfigTest/tags/retention
*/
- public static URI buildUriInClientFormat(ConfigKeyPath configKeyPath, ConfigStore cs,
- boolean returnURIWithAuthority) {
-
+ public static URI buildUriInClientFormat(ConfigKeyPath configKeyPath, ConfigStore cs, boolean returnURIWithAuthority) {
try {
if (!returnURIWithAuthority) {
return new URI(cs.getStoreURI().getScheme(), null, configKeyPath.getAbsolutePathString(), null, null);
}
+
URI storeRoot = cs.getStoreURI();
// if configKeyPath is root, the configKeyPath.getAbsolutePathString().substring(1) will return "" and
// will cause the Path creation failure if not handled here
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
index 4b7286a..2c99c59 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/IvyConfigStoreFactory.java
@@ -25,19 +25,16 @@ import java.nio.file.FileSystems;
import java.nio.file.Paths;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.utils.URLEncodedUtils;
-
-import com.sun.nio.zipfs.ZipFileSystem;
-
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.ConfigStoreFactory;
import org.apache.gobblin.config.store.hdfs.SimpleHDFSConfigStoreFactory;
import org.apache.gobblin.config.store.hdfs.SimpleHDFSStoreMetadata;
import org.apache.gobblin.config.store.hdfs.SimpleHadoopFilesystemConfigStore;
import org.apache.gobblin.util.DownloadUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.sun.nio.zipfs.ZipFileSystem;
/**
@@ -46,23 +43,20 @@ import org.apache.gobblin.util.DownloadUtils;
*
* An ivy settings file must be present on the classpath named {@link DownloadUtils#IVY_SETTINGS_FILE_NAME}
*/
-public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigStore> {
+public class IvyConfigStoreFactory extends SimpleLocalIvyConfigStoreFactory {
- private static final String IVY_SCHEME_PREFIX = "ivy-";
- private static final String ORG_KEY = "org";
- private static final String MODULE_KEY = "module";
- private static final String STORE_PATH_KEY = "storePath";
- private static final String STORE_PREFIX_KEY = "storePrefix";
+ /**
+ * Ivy coordinates required for downloading jar file.
+ */
+ protected static final String ORG_KEY = "org";
+ protected static final String MODULE_KEY = "module";
+ protected static final String STORE_PATH_KEY = "storePath";
@Override
public String getScheme() {
return getSchemePrefix() + SimpleHDFSConfigStoreFactory.HDFS_SCHEME_NAME;
}
- public String getSchemePrefix() {
- return IVY_SCHEME_PREFIX;
- }
-
/**
* Example configKey URI (configuration is passed as part of the query)
*
@@ -80,10 +74,7 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt
throw new ConfigStoreCreationException(configKey, "Config key URI must have scheme " + getScheme());
}
- Properties factoryProps = new Properties();
- for (NameValuePair param : URLEncodedUtils.parse(configKey, "UTF-8")) {
- factoryProps.setProperty(param.getName(), param.getValue());
- }
+ Properties factoryProps = parseUriIntoParameterSet(configKey);
String jarOrg = factoryProps.getProperty(ORG_KEY);
String jarModule = factoryProps.getProperty(MODULE_KEY);
@@ -116,12 +107,5 @@ public class IvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigSt
throw new ConfigStoreCreationException(configKey, e);
}
}
-
- /**
- * Base URI for a config store should be root of the zip file, so change path part of URI to be null
- */
- private URI getBaseURI(URI configKey) throws URISyntaxException {
- return new URI(configKey.getScheme(), configKey.getAuthority(), null, configKey.getQuery(), configKey.getFragment());
- }
}
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java
new file mode 100644
index 0000000..a733dd2
--- /dev/null
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/SimpleLocalIvyConfigStoreFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.config.store.zip;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.util.Properties;
+
+import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
+import org.apache.gobblin.config.store.api.ConfigStoreFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.sun.nio.zipfs.ZipFileSystem;
+
+
+/**
+ * An implementation of {@link ConfigStoreFactory} that takes a locally-existed zip as the backend of Config-store
+ * and creates a {@link ZipFileConfigStore} with it.
+ *
+ * {@link ZipFileConfigStore} has more advantage on encapsulating Config-store itself comparing to
+ * {@link org.apache.gobblin.config.store.hdfs.SimpleHadoopFilesystemConfigStore}, where the latter could, for example,
+ * cause small-file problem on HDFS as the size of Config-Store grows.
+ */
+public class SimpleLocalIvyConfigStoreFactory implements ConfigStoreFactory<ZipFileConfigStore> {
+
+ private String currentVersion;
+
+ private static ThreadLocal<FileSystem> THREADLOCAL_FS = new ThreadLocal<>();
+
+ static final String STORE_PREFIX_KEY = "storePrefix";
+ static final String IVY_SCHEME_PREFIX = "ivy-";
+
+ /**
+ * If not specified an version, assigned with an default version since the primary usage of this class
+ * is for testing.
+ */
+ public SimpleLocalIvyConfigStoreFactory() {
+ this.currentVersion = "v1.0";
+ }
+
+ public SimpleLocalIvyConfigStoreFactory(String configStoreVersion) {
+ this.currentVersion = configStoreVersion;
+ }
+
+ @Override
+ public String getScheme() {
+ return getSchemePrefix() + "file";
+ }
+
+ protected String getSchemePrefix() {
+ return IVY_SCHEME_PREFIX;
+ }
+
+ /**
+ *
+ * @param configKey whose path contains the physical path to the zip file.
+ * @return
+ * @throws ConfigStoreCreationException
+ */
+ @Override
+ public ZipFileConfigStore createConfigStore(URI configKey)
+ throws ConfigStoreCreationException {
+ Properties factoryProps = parseUriIntoParameterSet(configKey);
+
+ try {
+ // Construct URI as jar for zip file, as "jar" is the scheme for ZipFs.
+ URI uri = new URI("jar:file", null, new Path(factoryProps.getProperty("storePath")).toString(), null);
+
+ /** Using threadLocal to avoid {@link java.nio.file.FileSystemAlreadyExistsException} */
+ if (THREADLOCAL_FS.get() == null) {
+ FileSystem zipFs = FileSystems.newFileSystem(uri, ImmutableMap.of());
+ THREADLOCAL_FS.set(zipFs);
+ }
+
+ return new ZipFileConfigStore((ZipFileSystem) THREADLOCAL_FS.get(), getBaseURI(configKey), currentVersion,
+ factoryProps.getProperty(STORE_PREFIX_KEY, ""));
+ } catch (URISyntaxException | IOException e) {
+ throw new RuntimeException("Unable to load zip from classpath. ", e);
+ }
+ }
+
+ /**
+ * Parse the configKey and obtain the parameters set required to ivy coordinates.
+ */
+ Properties parseUriIntoParameterSet(URI configKey) {
+ Properties factoryProps = new Properties();
+ for (NameValuePair param : URLEncodedUtils.parse(configKey, "UTF-8")) {
+ factoryProps.setProperty(param.getName(), param.getValue());
+ }
+ return factoryProps;
+ }
+
+ /**
+ * Base URI for a config store should be root of the zip file, so change path part of URI to be null
+ */
+ URI getBaseURI(URI configKey) throws URISyntaxException {
+ return new URI(configKey.getScheme(), configKey.getAuthority(), "/", configKey.getQuery(), configKey.getFragment());
+ }
+}
diff --git a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
index 5475b49..711a836 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
+++ b/gobblin-config-management/gobblin-config-core/src/main/java/org/apache/gobblin/config/store/zip/ZipFileConfigStore.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
@@ -102,7 +103,7 @@ public class ZipFileConfigStore implements ConfigStore {
Preconditions.checkArgument(version.equals(getCurrentVersion()));
List<ConfigKeyPath> children = new ArrayList<>();
- Path datasetDir = getDatasetDirForKey(configKey);
+ Path datasetDir = getDatasetDirForKey(configKey, version);
try {
@@ -142,7 +143,7 @@ public class ZipFileConfigStore implements ConfigStore {
Preconditions.checkArgument(version.equals(getCurrentVersion()));
List<ConfigKeyPath> configKeyPaths = new ArrayList<>();
- Path datasetDir = getDatasetDirForKey(configKey);
+ Path datasetDir = getDatasetDirForKey(configKey, version);
Path includesFile = this.fs.getPath(datasetDir.toString(), SimpleHadoopFilesystemConfigStore.INCLUDES_CONF_FILE_NAME);
try {
@@ -171,7 +172,7 @@ public class ZipFileConfigStore implements ConfigStore {
Preconditions.checkNotNull(configKey, "configKey cannot be null!");
Preconditions.checkArgument(version.equals(getCurrentVersion()));
- Path datasetDir = getDatasetDirForKey(configKey);
+ Path datasetDir = getDatasetDirForKey(configKey, version);
Path mainConfFile = this.fs.getPath(datasetDir.toString(), SimpleHadoopFilesystemConfigStore.MAIN_CONF_FILE_NAME);
try {
@@ -193,7 +194,7 @@ public class ZipFileConfigStore implements ConfigStore {
/**
* Get path object using zipped file system and relative path
*/
- private Path getDatasetDirForKey(ConfigKeyPath configKey) throws VersionDoesNotExistException {
- return this.fs.getPath(this.storePrefix, configKey.getAbsolutePathString());
+ private Path getDatasetDirForKey(ConfigKeyPath configKey, String version) throws VersionDoesNotExistException {
+ return this.fs.getPath(this.storePrefix, version, configKey.getAbsolutePathString());
}
}
diff --git a/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory b/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
index 69865c1..21d3e81 100644
--- a/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
+++ b/gobblin-config-management/gobblin-config-core/src/main/resources/META-INF/services/org.apache.gobblin.config.store.api.ConfigStoreFactory
@@ -19,3 +19,4 @@ org.apache.gobblin.config.store.hdfs.SimpleHDFSConfigStoreFactory
org.apache.gobblin.config.store.hdfs.SimpleLocalHDFSConfigStoreFactory
org.apache.gobblin.config.store.hdfs.DefaultCapableLocalConfigStoreFactory
org.apache.gobblin.config.store.zip.IvyConfigStoreFactory
+org.apache.gobblin.config.store.zip.SimpleLocalIvyConfigStoreFactory
diff --git a/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java b/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
index 70ff132..12f69ca 100644
--- a/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
+++ b/gobblin-config-management/gobblin-config-core/src/test/java/org/apache/gobblin/config/store/zip/ZipFileConfigStoreTest.java
@@ -49,6 +49,18 @@ public class ZipFileConfigStoreTest {
private ConfigKeyPath child1Path = testPath.createChild("child1");
private ConfigKeyPath child2Path = testPath.createChild("child2");
+ /**
+ * Layout of testing config store:
+ * /_CONFIG_STORE
+ * /testVersion
+ * /test
+ * /child1
+ * main.conf (gobblin.test.property = "string2")
+ * includes.conf (test/child1)
+ * /child2
+ * main.conf (gobblin.test.property = "string3")
+ * main.conf
+ */
@BeforeClass
public void setUp() throws URISyntaxException, ConfigStoreCreationException, IOException {
Path path = Paths.get(this.getClass().getClassLoader().getResource("zipStoreTest.zip").getPath());
diff --git a/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip b/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip
index ffe9603..6f8896d 100644
Binary files a/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip and b/gobblin-config-management/gobblin-config-core/src/test/resources/zipStoreTest.zip differ
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index a05438c..6346d64 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -16,9 +16,16 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientUtils;
import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -30,23 +37,23 @@ import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.PathUtils;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
@Slf4j
public class ConfigStoreUtils {
+ /**
+ * Used as the grouping functionality in config-store to filter certain data nodes.
+ */
public static final String GOBBLIN_CONFIG_TAGS_WHITELIST = "gobblin.config.tags.whitelist";
public static final String GOBBLIN_CONFIG_TAGS_BLACKLIST = "gobblin.config.tags.blacklist";
+
public static final String GOBBLIN_CONFIG_FILTER = "gobblin.config.filter";
public static final String GOBBLIN_CONFIG_COMMONPATH = "gobblin.config.commonPath";
@@ -56,9 +63,9 @@ public class ConfigStoreUtils {
public static Collection<URI> getTopicsURIFromConfigStore(ConfigClient configClient, Path tagUri, String filterString,
Optional<Config> runtimeConfig) {
try {
- Collection<URI> importedBy = configClient.getImportedBy(new URI(tagUri.toString()), true, runtimeConfig);
+ Collection<URI> importedBy = configClient.getImportedBy(tagUri.toUri(), true, runtimeConfig);
return importedBy.stream().filter((URI u) -> u.toString().contains(filterString)).collect(Collectors.toList());
- } catch (URISyntaxException | ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException e) {
+ } catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException e) {
throw new Error(e);
}
}
@@ -81,9 +88,8 @@ public class ConfigStoreUtils {
public static URI getUriStringForTopic(String topicName, String commonPath, String configStoreUri)
throws URISyntaxException {
- URI storeUri = new URI(configStoreUri);
- Path path = PathUtils.mergePaths(new Path(storeUri.getPath()), PathUtils.mergePaths(new Path(commonPath), new Path(topicName)));
- URI topicUri = new URI(storeUri.getScheme(), storeUri.getAuthority(), path.toString(), storeUri.getQuery(), storeUri.getFragment());
+ Path fullTopicPathInConfigStore = PathUtils.mergePaths(new Path(commonPath), new Path(topicName));
+ URI topicUri = getUriFromPath(fullTopicPathInConfigStore, configStoreUri);
log.info("URI for topic is : " + topicUri.toString());
return topicUri;
}
@@ -124,10 +130,8 @@ public class ConfigStoreUtils {
/**
* Get topics from config store.
* Topics will either be whitelisted or blacklisted using tag.
- * After filtering out topics via tag, their config property is checked.
- * For each shortlisted topic, config must contain either property topic.blacklist or topic.whitelist
*
- * If tags are not provided, it will return all topics
+ * If tags are not provided, it will return all topics.
*/
public static List<KafkaTopic> getTopicsFromConfigStore(Properties properties, String configStoreUri,
GobblinKafkaConsumerClient kafkaConsumerClient) {
@@ -141,33 +145,14 @@ public class ConfigStoreUtils {
Optional<Config> runtimeConfig = ConfigClientUtils.getOptionalRuntimeConfig(properties);
if (properties.containsKey(GOBBLIN_CONFIG_TAGS_WHITELIST)) {
- Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
- "Missing required property " + GOBBLIN_CONFIG_FILTER);
- String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
- Path whiteListTagUri = PathUtils.mergePaths(new Path(configStoreUri),
- new Path(properties.getProperty(GOBBLIN_CONFIG_TAGS_WHITELIST)));
- List<String> whitelistedTopics = new ArrayList<>();
- ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, whiteListTagUri, filterString, runtimeConfig)
- .stream()
- .filter((URI u) -> ConfigUtils.getBoolean(ConfigStoreUtils.getConfig(configClient, u, runtimeConfig),
- KafkaSource.TOPIC_WHITELIST, false))
- .forEach(((URI u) -> whitelistedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
-
+ List<String> whitelistedTopics = getListOfTopicNamesByFilteringTag(properties, configClient, runtimeConfig,
+ configStoreUri, GOBBLIN_CONFIG_TAGS_WHITELIST);
return allTopics.stream()
.filter((KafkaTopic p) -> whitelistedTopics.contains(p.getName()))
.collect(Collectors.toList());
} else if (properties.containsKey(GOBBLIN_CONFIG_TAGS_BLACKLIST)) {
- Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
- "Missing required property " + GOBBLIN_CONFIG_FILTER);
- String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
- Path blackListTagUri = PathUtils.mergePaths(new Path(configStoreUri),
- new Path(properties.getProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST)));
- List<String> blacklistedTopics = new ArrayList<>();
- ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, blackListTagUri, filterString, runtimeConfig)
- .stream()
- .filter((URI u) -> ConfigUtils.getBoolean(ConfigStoreUtils.getConfig(configClient, u, runtimeConfig),
- KafkaSource.TOPIC_BLACKLIST, false))
- .forEach(((URI u) -> blacklistedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
+ List<String> blacklistedTopics = getListOfTopicNamesByFilteringTag(properties, configClient, runtimeConfig,
+ configStoreUri, GOBBLIN_CONFIG_TAGS_BLACKLIST);
return allTopics.stream()
.filter((KafkaTopic p) -> !blacklistedTopics.contains(p.getName()))
.collect(Collectors.toList());
@@ -178,11 +163,58 @@ public class ConfigStoreUtils {
}
/**
+ * Using the tag feature provided by Config-Store for grouping, getting a list of topics (case-sensitive,
+ * need to be matched with what would be returned from kafka broker) tagged by the tag value specified
+ * in job configuration.
+ */
+ public static List<String> getListOfTopicNamesByFilteringTag(Properties properties, ConfigClient configClient,
+ Optional<Config> runtimeConfig, String configStoreUri, String tagConfName) {
+ Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_FILTER),
+ "Missing required property " + GOBBLIN_CONFIG_FILTER);
+ String filterString = properties.getProperty(GOBBLIN_CONFIG_FILTER);
+
+ Path tagUri = new Path("/");
+ try {
+ tagUri = new Path(getUriFromPath(new Path(properties.getProperty(tagConfName)), configStoreUri));
+ } catch (URISyntaxException ue) {
+ log.error("Cannot construct a Tag URI due to the exception:", ue);
+ }
+
+ List<String> taggedTopics = new ArrayList<>();
+ ConfigStoreUtils.getTopicsURIFromConfigStore(configClient, tagUri, filterString, runtimeConfig)
+ .forEach(((URI u) -> taggedTopics.add(ConfigStoreUtils.getTopicNameFromURI(u))));
+
+ return taggedTopics;
+ }
+
+ /**
+ * Construct the URI for a Config-Store node given a path.
+ * The implementation will be based on scheme, while the signature of this method will not be subject to
+ * different implementation.
+ *
+ * The implementation will be different since Fs-based config-store simply append dataNode's path in the end,
+ * while ivy-based config-store will require query to store those information.
+ *
+ * @param path The relative path of a node inside Config-Store.
+ * @param configStoreUri The config store URI.
+ * @return The URI to inspect a data node represented by path inside Config Store.
+ * @throws URISyntaxException
+ */
+ private static URI getUriFromPath(Path path, String configStoreUri) throws URISyntaxException {
+ URI storeUri = new URI(configStoreUri);
+ return new URI(storeUri.getScheme(), storeUri.getAuthority(),
+ PathUtils.mergePaths(new Path(storeUri.getPath()), path).toString(), storeUri.getQuery(), storeUri.getFragment());
+
+ }
+
+ /**
* Shortlist topics from config store based on whitelist/blacklist tags and
* add it to {@param whitelist}/{@param blacklist}
*
* If tags are not provided, blacklist and whitelist won't be modified
+ * @deprecated Since this method contains implementation-specific way to construct TagURI inside Config-Store.
*/
+ @Deprecated
public static void setTopicsFromConfigStore(Properties properties, Set<String> blacklist, Set<String> whitelist,
final String _blacklistTopicKey, final String _whitelistTopicKey) {
Optional<String> configStoreUri = getConfigStoreUri(properties);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index cb0e2b5..f8c6f4a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -698,22 +698,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
}
/**
- * If config store is enabled, then intersection of topics from blacklisting/whitelisting will be taken against
- * the topics from config-store
+ * Return topics to be processed filtered by job-level whitelist and blacklist.
*/
private List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
- List<KafkaTopic> topics = this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
- Optional<String> configStoreUri = ConfigStoreUtils.getConfigStoreUri(state.getProperties());
- if (configStoreUri.isPresent()) {
- List<KafkaTopic> topicsFromConfigStore = ConfigStoreUtils
- .getTopicsFromConfigStore(state.getProperties(), configStoreUri.get(), this.kafkaConsumerClient.get());
-
- return topics.stream().filter((KafkaTopic p) -> (topicsFromConfigStore.stream()
- .anyMatch((KafkaTopic q) -> q.getName().equalsIgnoreCase(p.getName())))).collect(toList());
- }
- return topics;
+ return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
}
@Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
new file mode 100644
index 0000000..d296937
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_ENABLED;
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_COMMONPATH;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_FILTER;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_BLACKLIST;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_WHITELIST;
+import static org.mockito.Matchers.anyList;
+
+
+/**
+ * Added this testing to protect no behavior changes on {@link ConfigStoreUtils} after refactoring.
+ */
+public class ConfigStoreUtilsTest {
+
+ // Declare as string in convenience of testing.
+ private String configStoreUri;
+
+ private GobblinKafkaConsumerClient mockClient;
+
+ private ConfigClient configClient = ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
+
+ /**
+ * Loading a fs-based config-store for ease of unit testing.
+ * @throws Exception
+ */
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ URL url = this.getClass().getClassLoader().getResource("_CONFIG_STORE");
+ configStoreUri = getStoreURI(new Path(url.getPath()).getParent().toString()).toString();
+ mockClient = Mockito.mock(GobblinKafkaConsumerClient.class);
+ }
+
+ @Test
+ public void testGetUriStringForTopic() throws Exception {
+ String commonPath = "/data/tracking";
+ URI topic1URI = ConfigStoreUtils.getUriStringForTopic("Topic1", commonPath, configStoreUri);
+ URI expectedTopic1URI = new URI("simple-file", "", new URI(configStoreUri).getPath() + "/data/tracking/Topic1", null, null);
+ Assert.assertEquals(topic1URI, expectedTopic1URI);
+
+ URI topic2URI = ConfigStoreUtils.getUriStringForTopic("Topic2", commonPath, configStoreUri);
+ URI expectedTopic2URI = new URI("simple-file", "", new URI(configStoreUri).getPath() + "/data/tracking/Topic2", null, null);
+ Assert.assertEquals(topic2URI, expectedTopic2URI);
+ }
+
+ @Test
+ public void testGetConfigForTopic() throws Exception {
+ Properties properties = new Properties();
+ String commonPath = "/data/tracking";
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, commonPath);
+ properties.setProperty(CONFIG_MANAGEMENT_STORE_URI, configStoreUri);
+ properties.setProperty(CONFIG_MANAGEMENT_STORE_ENABLED, "true");
+ properties.setProperty("topic.name", "Topic1");
+
+ Config topic1Config = ConfigStoreUtils.getConfigForTopic(properties, "topic.name", configClient).get();
+ Assert.assertEquals(topic1Config.getString("aaaaa"), "bbbb");
+ }
+
+
+
+ @Test
+ public void testGetTopicsFromConfigStore()
+ throws Exception {
+ KafkaTopic topic1 = new KafkaTopic("Topic1", Lists.newArrayList());
+ KafkaTopic topic2 = new KafkaTopic("Topic2", Lists.newArrayList());
+ KafkaTopic topic3 = new KafkaTopic("Topic3", Lists.newArrayList());
+
+ Mockito.when(mockClient.getFilteredTopics(anyList(), anyList()))
+ .thenReturn(ImmutableList.of(topic1, topic2, topic3));
+ Properties properties = new Properties();
+
+ // Empty properties returns everything: topic1, 2 and 3.
+ List<KafkaTopic> result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 3);
+
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+ properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+ // Whitelist only two topics. Should only returned whitelisted topics.
+ result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 2);
+ List<String> resultInString = result.stream().map(KafkaTopic::getName).collect(Collectors.toList());
+ Assert.assertTrue(resultInString.contains("Topic1"));
+ Assert.assertTrue(resultInString.contains("Topic2"));
+
+ // Blacklist two topics. Should only return non-blacklisted topics.
+ properties.remove(GOBBLIN_CONFIG_TAGS_WHITELIST);
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST, "/tags/blacklist");
+ result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getName(), "Topic3");
+ }
+
+ @Test
+ public void testGetListOfTopicNamesByFilteringTag()
+ throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+ properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+ List<String> result = ConfigStoreUtils
+ .getListOfTopicNamesByFilteringTag(properties, configClient, Optional.absent(), configStoreUri,
+ GOBBLIN_CONFIG_TAGS_WHITELIST);
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertTrue(result.contains("Topic1"));
+ Assert.assertTrue(result.contains("Topic2"));
+ }
+
+ /**
+ * Return localFs-based config-store uri.
+ * Note that for local FS, fs.getUri will return an URI without authority. So we shouldn't add authority when
+ * we construct an URI for local-file backed config-store.
+ */
+ private URI getStoreURI(String configDir)
+ throws URISyntaxException {
+ return new URI("simple-file", "", configDir, null, null);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java
new file mode 100644
index 0000000..ebdd27b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ZipConfigStoreUtilsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
+import org.apache.gobblin.config.store.zip.SimpleLocalIvyConfigStoreFactory;
+import org.apache.gobblin.config.store.zip.ZipFileConfigStore;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_ENABLED;
+import static org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_COMMONPATH;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_FILTER;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_BLACKLIST;
+import static org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_TAGS_WHITELIST;
+import static org.mockito.Matchers.anyList;
+
+
+/**
+ * The same testing routine for ivy-based config-store (ZipConfigStore)
+ * Make sure everything inside {@link ConfigStoreUtils} will work for {@link ZipFileConfigStore} implementation.
+ */
+public class ZipConfigStoreUtilsTest {
+ private String configStoreUri;
+ private ConfigClient configClient = ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
+ private GobblinKafkaConsumerClient mockClient;
+
+ @BeforeClass
+ public void setUp()
+ throws URISyntaxException, ConfigStoreCreationException, IOException {
+ Path path =
+ Paths.get(ZipConfigStoreUtilsTest.class.getClassLoader().getResource("IvyConfigStoreTest.zip").getPath());
+ URI zipInClassPathURI = new URI(
+ "ivy-file:/?org=org&module=module&storePath=" + path
+ + "&storePrefix=_CONFIG_STORE");
+
+ ZipFileConfigStore store = new SimpleLocalIvyConfigStoreFactory().createConfigStore(zipInClassPathURI);
+ configStoreUri = store.getStoreURI().toString();
+ mockClient = Mockito.mock(GobblinKafkaConsumerClient.class);
+ }
+
+ @Test
+ public void testGetListOfTopicNamesByFilteringTag()
+ throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+ properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+ List<String> result = ConfigStoreUtils
+ .getListOfTopicNamesByFilteringTag(properties, configClient, Optional.absent(), configStoreUri,
+ GOBBLIN_CONFIG_TAGS_WHITELIST);
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertTrue(result.contains("Topic1"));
+ Assert.assertTrue(result.contains("Topic2"));
+ }
+
+ @Test
+ public void testGetTopicsFromConfigStore()
+ throws Exception {
+ KafkaTopic topic1 = new KafkaTopic("Topic1", Lists.newArrayList());
+ KafkaTopic topic2 = new KafkaTopic("Topic2", Lists.newArrayList());
+ KafkaTopic topic3 = new KafkaTopic("Topic3", Lists.newArrayList());
+
+ Mockito.when(mockClient.getFilteredTopics(anyList(), anyList()))
+ .thenReturn(ImmutableList.of(topic1, topic2, topic3));
+ Properties properties = new Properties();
+
+ // Empty properties returns everything: topic1, 2 and 3.
+ List<KafkaTopic> result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 3);
+
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_WHITELIST, "/tags/whitelist");
+ properties.setProperty(GOBBLIN_CONFIG_FILTER, "/data/tracking");
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, "/data/tracking");
+
+ // Whitelist only two topics. Should only returned whitelisted topics.
+ result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 2);
+ List<String> resultInString = result.stream().map(KafkaTopic::getName).collect(Collectors.toList());
+ Assert.assertTrue(resultInString.contains("Topic1"));
+ Assert.assertTrue(resultInString.contains("Topic2"));
+
+ // Blacklist two topics. Should only return non-blacklisted topics.
+ properties.remove(GOBBLIN_CONFIG_TAGS_WHITELIST);
+ properties.setProperty(GOBBLIN_CONFIG_TAGS_BLACKLIST, "/tags/blacklist");
+ result = ConfigStoreUtils.getTopicsFromConfigStore(properties, configStoreUri, mockClient);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getName(), "Topic3");
+ }
+
+ @Test
+ public void testGetConfigForTopic() throws Exception {
+ Properties properties = new Properties();
+ String commonPath = "/data/tracking";
+ properties.setProperty(GOBBLIN_CONFIG_COMMONPATH, commonPath);
+ properties.setProperty(CONFIG_MANAGEMENT_STORE_URI, configStoreUri);
+ properties.setProperty(CONFIG_MANAGEMENT_STORE_ENABLED, "true");
+ properties.setProperty("topic.name", "Topic1");
+
+ Config topic1Config = ConfigStoreUtils.getConfigForTopic(properties, "topic.name", configClient).get();
+ Assert.assertEquals(topic1Config.getString("aaaaa"), "bbbb");
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip b/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip
new file mode 100644
index 0000000..41fcac2
Binary files /dev/null and b/gobblin-modules/gobblin-kafka-common/src/test/resources/IvyConfigStoreTest.zip differ
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf
new file mode 100644
index 0000000..1ba8c7d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/store-metadata.conf
@@ -0,0 +1 @@
+{"config":{"hdfs":{"store":{"version":{"current":"v1.0"}}}}}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf
new file mode 100644
index 0000000..f91bbd1
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/includes.conf
@@ -0,0 +1,5 @@
+# Tag this dataset with "whitelist"
+/tags/whitelist
+
+# Tag this dataset with "blacklist"
+/tags/blacklist
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf
new file mode 100644
index 0000000..985e7e2
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic1/main.conf
@@ -0,0 +1 @@
+aaaaa=bbbb
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf
new file mode 100644
index 0000000..9fad384
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/data/tracking/Topic2/includes.conf
@@ -0,0 +1,2 @@
+/tags/whitelist
+/tags/blacklist
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf
new file mode 100644
index 0000000..c946b57
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/blacklist/main.conf
@@ -0,0 +1 @@
+topic.whitelist=true
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf
new file mode 100644
index 0000000..84adf18
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/_CONFIG_STORE/v1.0/tags/whitelist/main.conf
@@ -0,0 +1 @@
+topic.blacklist=true
\ No newline at end of file