You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/18 17:50:44 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1198] status cleaner

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e32b4e9  [GOBBLIN-1198] status cleaner
e32b4e9 is described below

commit e32b4e9653da1ef329c562d3c848ccbd8981da5a
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Jun 18 10:50:34 2020 -0700

    [GOBBLIN-1198] status cleaner
    
    Closes #3044 from arjun4084346/cleanJobStatus
---
 .../main/java/org/apache/gobblin/configuration/State.java   |  2 --
 .../gobblin/metastore/util/StateStoreCleanerRunnable.java   |  1 +
 .../gobblin/service/monitoring/KafkaJobStatusMonitor.java   | 13 +++++++++++--
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index 63ced0a..eab5665 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -20,9 +20,7 @@ package org.apache.gobblin.configuration;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
index cbc7f2e..b410622 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * A utility class that wraps the {@link StateStoreCleaner} implementation as a {@link Runnable}.
  */
 @Slf4j
+@Deprecated
 public class StateStoreCleanerRunnable implements Runnable {
   private Properties properties;
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 30766d5..ed879df 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -39,15 +39,18 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -99,7 +102,13 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   protected void startUp() {
     super.startUp();
     log.info("Scheduling state store cleaner..");
-    scheduledExecutorService.scheduleAtFixedRate(new StateStoreCleanerRunnable(this.config), 300, 86400L, TimeUnit.SECONDS);
+    org.apache.gobblin.configuration.State state = new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(this.config));
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, "GobblinServiceJobStatusCleanerJob");
+    state.setProp(ConfigurationKeys.TASK_ID_KEY, "GobblinServiceJobStatusCleanerTask");
+
+    TaskContext taskContext = new TaskContext(new WorkUnitState(WorkUnit.createEmpty(), state));
+    DatasetCleanerTask cleanerTask = new DatasetCleanerTask(taskContext);
+    scheduledExecutorService.scheduleAtFixedRate(cleanerTask, 300L, 86400L, TimeUnit.SECONDS);
   }
 
   @Override