You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/18 17:50:44 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1198] status
cleaner
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e32b4e9 [GOBBLIN-1198] status cleaner
e32b4e9 is described below
commit e32b4e9653da1ef329c562d3c848ccbd8981da5a
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Jun 18 10:50:34 2020 -0700
[GOBBLIN-1198] status cleaner
Closes #3044 from arjun4084346/cleanJobStatus
---
.../main/java/org/apache/gobblin/configuration/State.java | 2 --
.../gobblin/metastore/util/StateStoreCleanerRunnable.java | 1 +
.../gobblin/service/monitoring/KafkaJobStatusMonitor.java | 13 +++++++++++--
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index 63ced0a..eab5665 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -20,9 +20,7 @@ package org.apache.gobblin.configuration;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Set;
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
index cbc7f2e..b410622 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.util.ConfigUtils;
* A utility class that wraps the {@link StateStoreCleaner} implementation as a {@link Runnable}.
*/
@Slf4j
+@Deprecated
public class StateStoreCleanerRunnable implements Runnable {
private Properties properties;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 30766d5..ed879df 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -39,15 +39,18 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
@@ -99,7 +102,13 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
protected void startUp() {
super.startUp();
log.info("Scheduling state store cleaner..");
- scheduledExecutorService.scheduleAtFixedRate(new StateStoreCleanerRunnable(this.config), 300, 86400L, TimeUnit.SECONDS);
+ org.apache.gobblin.configuration.State state = new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(this.config));
+ state.setProp(ConfigurationKeys.JOB_ID_KEY, "GobblinServiceJobStatusCleanerJob");
+ state.setProp(ConfigurationKeys.TASK_ID_KEY, "GobblinServiceJobStatusCleanerTask");
+
+ TaskContext taskContext = new TaskContext(new WorkUnitState(WorkUnit.createEmpty(), state));
+ DatasetCleanerTask cleanerTask = new DatasetCleanerTask(taskContext);
+ scheduledExecutorService.scheduleAtFixedRate(cleanerTask, 300L, 86400L, TimeUnit.SECONDS);
}
@Override