You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/12 00:55:26 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 156a3af  [GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…
156a3af is described below

commit 156a3af18aad163f42ceb69f33fa35145d7d5405
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Thu Jun 11 17:55:07 2020 -0700

    [GOBBLIN-1185] Enable datasetCleaner to emit kafka event and add config field in Ver…
    
    Closes #3033 from ZihanLi58/GOBBLIN-1185
---
 .../data/management/retention/DatasetCleaner.java  | 30 ++++++++++++++++------
 .../retention/dataset/CleanableDatasetBase.java    |  4 ++-
 .../dataset/ConfigurableCleanableDataset.java      |  3 ++-
 .../dataset/MultiVersionCleanableDatasetBase.java  | 11 ++++----
 .../kafka/writer/Kafka09DataWriterTest.java        | 10 ++++----
 5 files changed, 38 insertions(+), 20 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 848b0d2..8c5dd61 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -17,9 +17,12 @@
 
 package org.apache.gobblin.data.management.retention;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -27,6 +30,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,33 +94,41 @@ public class DatasetCleaner implements Instrumentable, Closeable {
 
   public DatasetCleaner(FileSystem fs, Properties props) throws IOException {
 
-    State state = new State(props);
+    Properties properties = new Properties();
+    properties.putAll(props);
+    // load dynamic configuration and add them to the job properties
+    Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
+    DynamicConfigGenerator dynamicConfigGenerator =
+        DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
+    Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
+    properties.putAll(ConfigUtils.configToProperties(dynamicConfig));
+    State state = new State(properties);
     FileSystem targetFs =
-        props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
+        properties.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs;
     this.closer = Closer.create();
     // TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics does not depend on gobblin-core.
     List<Tag<?>> tags = Lists.newArrayList();
     tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
     this.metricContext =
-        this.closer.register(Instrumented.getMetricContext(new State(props), DatasetCleaner.class, tags));
-    this.isMetricEnabled = GobblinMetrics.isEnabled(props);
+        this.closer.register(Instrumented.getMetricContext(state, DatasetCleaner.class, tags));
+    this.isMetricEnabled = GobblinMetrics.isEnabled(properties);
     this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, RetentionEvents.NAMESPACE).build();
     try {
       FileSystem optionalRateControlledFs = targetFs;
-      if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
+      if (properties.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
         optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(targetFs,
-            Long.parseLong(props.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
+            Long.parseLong(properties.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT))));
         ((RateControlledFileSystem) optionalRateControlledFs).startRateControl();
       }
 
-      this.datasetFinder = new MultiCleanableDatasetFinder(optionalRateControlledFs, props, eventSubmitter);
+      this.datasetFinder = new MultiCleanableDatasetFinder(optionalRateControlledFs, properties, eventSubmitter);
     } catch (NumberFormatException exception) {
       throw new IOException(exception);
     } catch (ExecutionException exception) {
       throw new IOException(exception);
     }
     ExecutorService executor = ScalingThreadPoolExecutor.newScalingThreadPool(0,
-        Integer.parseInt(props.getProperty(MAX_CONCURRENT_DATASETS_CLEANED, DEFAULT_MAX_CONCURRENT_DATASETS_CLEANED)),
+        Integer.parseInt(properties.getProperty(MAX_CONCURRENT_DATASETS_CLEANED, DEFAULT_MAX_CONCURRENT_DATASETS_CLEANED)),
         100, ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("Dataset-cleaner-pool-%d")));
     this.service = ExecutorsUtils.loggingDecorator(executor);
 
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
index be3d0c0..5f618d6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.retention.dataset;
 
+import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
@@ -122,6 +123,7 @@ public abstract class CleanableDatasetBase<T extends FileSystemDatasetVersion>
   @Override
   public List<VersionFinderAndPolicy<T>> getVersionFindersAndPolicies() {
     return ImmutableList
-        .<VersionFinderAndPolicy<T>> of(new VersionFinderAndPolicy<>(getRetentionPolicy(), getVersionFinder()));
+        .<VersionFinderAndPolicy<T>> of(new VersionFinderAndPolicy<>(getRetentionPolicy(), getVersionFinder(),
+            ConfigFactory.empty()));
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
index 1c47727..80f0d00 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/ConfigurableCleanableDataset.java
@@ -176,7 +176,7 @@ public class ConfigurableCleanableDataset<T extends FileSystemDatasetVersion>
       String versionFinderKey) {
     this.versionFindersAndPolicies
         .add(new VersionFinderAndPolicy<>(createRetentionPolicy(config.getString(retentionPolicyKey), config, jobProps),
-            createVersionFinder(config.getString(versionFinderKey), config, jobProps)));
+            createVersionFinder(config.getString(versionFinderKey), config, jobProps), config));
   }
 
   private void initWithSelectionPolicy(Config config, Properties jobProps) {
@@ -189,6 +189,7 @@ public class ConfigurableCleanableDataset<T extends FileSystemDatasetVersion>
             config.root().render(ConfigRenderOptions.concise())));
 
     VersionFinderAndPolicyBuilder<T> builder = VersionFinderAndPolicy.builder();
+    builder.config(config);
     builder.versionFinder(createVersionFinder(config.getString(versionFinderKey), config, jobProps));
     if (config.hasPath(selectionPolicyKey)) {
       builder.versionSelectionPolicy(createSelectionPolicy(
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 03e56fe..8620822 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -332,24 +332,25 @@ public abstract class MultiVersionCleanableDatasetBase<T extends FileSystemDatas
   @Builder
   @AllArgsConstructor
   public static class VersionFinderAndPolicy<T extends FileSystemDatasetVersion> {
-
     private final VersionSelectionPolicy<T> versionSelectionPolicy;
     private final VersionFinder<? extends T> versionFinder;
     @Singular
     private final List<RetentionAction> retentionActions;
+    private final Config config;
 
     /**
      * Constructor for backward compatibility
      * @deprecated use {@link VersionFinderAndPolicyBuilder}
      */
     @Deprecated
-    public VersionFinderAndPolicy(VersionSelectionPolicy<T> versionSelectionPolicy, VersionFinder<? extends T> versionFinder) {
+    public VersionFinderAndPolicy(VersionSelectionPolicy<T> versionSelectionPolicy, VersionFinder<? extends T> versionFinder, Config config) {
       this.versionSelectionPolicy = versionSelectionPolicy;
       this.versionFinder = versionFinder;
       this.retentionActions = Lists.newArrayList();
+      this.config = config;
     }
-    public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, VersionFinder<? extends T> versionFinder) {
-      this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy), versionFinder);
+    public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, VersionFinder<? extends T> versionFinder, Config config) {
+      this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy), versionFinder, config);
     }
 
     public static class VersionFinderAndPolicyBuilder<T extends FileSystemDatasetVersion> {
@@ -371,7 +372,7 @@ public abstract class MultiVersionCleanableDatasetBase<T extends FileSystemDatas
           localRetentionActions = Lists.newArrayList(this.retentionActions);
         }
         return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy, this.versionFinder,
-            localRetentionActions);
+            localRetentionActions, this.config);
       }
     }
   }
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
index 58cfcb9..e535b63 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09DataWriterTest.java
@@ -84,7 +84,7 @@ public class Kafka09DataWriterTest {
     _kafkaTestHelper.provisionTopic(topic);
     Properties props = new Properties();
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
-    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     Kafka09DataWriter<String, String> kafka09DataWriter = new Kafka09DataWriter<>(props);
     String messageString = "foobar";
@@ -115,7 +115,7 @@ public class Kafka09DataWriterTest {
     _kafkaTestHelper.provisionTopic(topic);
     Properties props = new Properties();
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
-    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
     Kafka09DataWriter<String, byte[]> kafka09DataWriter = new Kafka09DataWriter<>(props);
     WriteCallback callback = mock(WriteCallback.class);
@@ -141,7 +141,7 @@ public class Kafka09DataWriterTest {
     Properties props = new Properties();
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
-        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
         LiAvroSerializer.class.getName());
 
@@ -180,7 +180,7 @@ public class Kafka09DataWriterTest {
     Properties props = new Properties();
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
-        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
         LiAvroSerializer.class.getName());
     props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
@@ -224,7 +224,7 @@ public class Kafka09DataWriterTest {
     Properties props = new Properties();
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
-        "localhost:" + _kafkaTestHelper.getKafkaServerPort());
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
     props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
     "org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");