You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/12 00:55:26 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 156a3af [GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…
156a3af is described below
commit 156a3af18aad163f42ceb69f33fa35145d7d5405
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Thu Jun 11 17:55:07 2020 -0700
[GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…
Closes #3033 from ZihanLi58/GOBBLIN-1185
---
.../data/management/retention/DatasetCleaner.java | 30 ++++++++++++++++------
.../retention/dataset/CleanableDatasetBase.java | 4 ++-
.../dataset/ConfigurableCleanableDataset.java | 3 ++-
.../dataset/MultiVersionCleanableDatasetBase.java | 11 ++++----
.../kafka/writer/Kafka09DataWriterTest.java | 10 ++++----
5 files changed, 38 insertions(+), 20 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 848b0d2..8c5dd61 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -17,9 +17,12 @@
package org.apache.gobblin.data.management.retention;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -27,6 +30,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,33 +94,41 @@ public class DatasetCleaner implements Instrumentable, Closeable {
public DatasetCleaner(FileSystem fs, Properties props) throws IOException {
- State state = new State(props);
+ Properties properties = new Properties();
+ properties.putAll(props);
+ // load dynamic configuration and add them to the job properties
+ Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
+ DynamicConfigGenerator dynamicConfigGenerator =
+ DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
+ Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
+ properties.putAll(ConfigUtils.configToProperties(dynamicConfig));
+ State state = new State(properties);
FileSystem targetFs =
- props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
+ properties.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
this.closer = Closer.create();
// TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics does not depend on gobblin-core.
List<Tag<?>> tags = Lists.newArrayList();
tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
this.metricContext =
- this.closer.register(Instrumented.getMetricContext(new State(props), DatasetCleaner.class, tags));
- this.isMetricEnabled = GobblinMetrics.isEnabled(props);
+ this.closer.register(Instrumented.getMetricContext(state, DatasetCleaner.class, tags));
+ this.isMetricEnabled = GobblinMetrics.isEnabled(properties);
this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, RetentionEvents.NAMESPACE).build();
try {
FileSystem optionalRateControlledFs = targetFs;
- if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
+ if (properties.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(targetFs,
- Long.parseLong(props.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
+ Long.parseLong(properties.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
((RateControlledFileSystem) optionalRateControlledFs).startRateControl();
}
- this.datasetFinder = new MultiCleanableDatasetFinder(optionalRateControlledFs, props, eventSubmitter);
+ this.datasetFinder = new MultiCleanableDatasetFinder(optionalRateControlledFs, properties, eventSubmitter);
} catch (NumberFormatException exception) {
throw new IOException(exception);
} catch (ExecutionException exception) {
throw new IOException(exception);
}
ExecutorService executor = ScalingThreadPoolExecutor.newScalingThreadPool(0,
- Integer.parseInt(props.getProperty(MAX_CONCURRENT_DATASETS_CLEANED, DEFAULT_MAX_CONCURRENT_DATASETS_CLEANED)),
+ Integer.parseInt(properties.getProperty(MAX_CONCURRENT_DATASETS_CLEANED, DEFAULT_MAX_CONCURRENT_DATASETS_CLEANED)),
100, ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("Dataset-cleaner-pool-%d")));
this.service = ExecutorsUtils.loggingDecorator(executor);
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
index be3d0c0..5f618d6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.retention.dataset;
+import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
@@ -122,6 +123,7 @@ public abstract class CleanableDatasetBase<T extends FileSystemDatasetVersion>
@Override
public List<VersionFinderAndPolicy<T>> getVersionFindersAndPolicies() {
return ImmutableList
- .<VersionFinderAndPolicy<T>> of(new VersionFinderAndPolicy<>(getRetentionPolicy(), getVersionFinder()));
+ .<VersionFinderAndPolicy<T>> of(new VersionFinderAndPolicy<>(getRetentionPolicy(), getVersionFinder(),
+ ConfigFactory.empty()));
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
index 1c47727..80f0d00 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
@@ -176,7 +176,7 @@ public class ConfigurableCleanableDataset<T extends FileSystemDatasetVersion>
String versionFinderKey) {
this.versionFindersAndPolicies
.add(new VersionFinderAndPolicy<>(createRetentionPolicy(config.getString(retentionPolicyKey), config, jobProps),
- createVersionFinder(config.getString(versionFinderKey), config, jobProps)));
+ createVersionFinder(config.getString(versionFinderKey), config, jobProps), config));
}
private void initWithSelectionPolicy(Config config, Properties jobProps) {
@@ -189,6 +189,7 @@ public class ConfigurableCleanableDataset<T extends FileSystemDatasetVersion>
config.root().render(ConfigRenderOptions.concise())));
VersionFinderAndPolicyBuilder<T> builder = VersionFinderAndPolicy.builder();
+ builder.config(config);
builder.versionFinder(createVersionFinder(config.getString(versionFinderKey), config, jobProps));
if (config.hasPath(selectionPolicyKey)) {
builder.versionSelectionPolicy(createSelectionPolicy(
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 03e56fe..8620822 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -332,24 +332,25 @@ public abstract class MultiVersionCleanableDatasetBase<T extends FileSystemDatas
@Builder
@AllArgsConstructor
public static class VersionFinderAndPolicy<T extends FileSystemDatasetVersion> {
-
private final VersionSelectionPolicy<T> versionSelectionPolicy;
private final VersionFinder<? extends T> versionFinder;
@Singular
private final List<RetentionAction> retentionActions;
+ private final Config config;
/**
* Constructor for backward compatibility
* @deprecated use {@link VersionFinderAndPolicyBuilder}
*/
@Deprecated
- public VersionFinderAndPolicy(VersionSelectionPolicy<T> versionSelectionPolicy, VersionFinder<? extends T> versionFinder) {
+ public VersionFinderAndPolicy(VersionSelectionPolicy<T> versionSelectionPolicy, VersionFinder<? extends T> versionFinder, Config config) {
this.versionSelectionPolicy = versionSelectionPolicy;
this.versionFinder = versionFinder;
this.retentionActions = Lists.newArrayList();
+ this.config = config;
}
- public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, VersionFinder<? extends T> versionFinder) {
- this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy), versionFinder);
+ public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, VersionFinder<? extends T> versionFinder, Config config) {
+ this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy), versionFinder, config);
}
public static class VersionFinderAndPolicyBuilder<T extends FileSystemDatasetVersion> {
@@ -371,7 +372,7 @@ public abstract class MultiVersionCleanableDatasetBase<T extends FileSystemDatas
localRetentionActions = Lists.newArrayList(this.retentionActions);
}
return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy, this.versionFinder,
- localRetentionActions);
+ localRetentionActions, this.config);
}
}
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
index 58cfcb9..e535b63 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
@@ -84,7 +84,7 @@ public class Kafka09DataWriterTest {
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
- props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Kafka09DataWriter<String, String> kafka09DataWriter = new Kafka09DataWriter<>(props);
String messageString = "foobar";
@@ -115,7 +115,7 @@ public class Kafka09DataWriterTest {
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
- props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Kafka09DataWriter<String, byte[]> kafka09DataWriter = new Kafka09DataWriter<>(props);
WriteCallback callback = mock(WriteCallback.class);
@@ -141,7 +141,7 @@ public class Kafka09DataWriterTest {
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
- "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
LiAvroSerializer.class.getName());
@@ -180,7 +180,7 @@ public class Kafka09DataWriterTest {
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
- "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
LiAvroSerializer.class.getName());
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
@@ -224,7 +224,7 @@ public class Kafka09DataWriterTest {
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
- "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");