You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/30 17:30:02 UTC

incubator-gobblin git commit: [GOBBLIN-469] Add Task for running the DatasetCleaner

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master be92ad18b -> 0cbaf5996


[GOBBLIN-469] Add Task for running the DatasetCleaner

Closes #2342 from htran1/cleaner_task


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0cbaf599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0cbaf599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0cbaf599

Branch: refs/heads/master
Commit: 0cbaf599604a7bf30b3ba5666dd6c8368cf13e37
Parents: be92ad1
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Apr 30 10:29:54 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 30 10:29:54 2018 -0700

----------------------------------------------------------------------
 .../retention/source/DatasetCleanerSource.java  | 106 +++++++++++++++++++
 .../runtime/retention/DatasetCleanerTask.java   |  54 ++++++++++
 .../retention/DatasetCleanerTaskFactory.java    |  43 ++++++++
 .../source/DatasetCleanerSourceTest.java        |  98 +++++++++++++++++
 .../resources/mysql-state-store-retention.pull  |  41 +++++++
 5 files changed, 342 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java
new file mode 100644
index 0000000..038978c
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTaskFactory;
+import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This class generates workunits from the configuration for cleaning datasets using instances of the
+ * {@link DatasetCleanerTask}
+ */
+@Slf4j
+public class DatasetCleanerSource implements Source<Object, Object> {
+  public static final String DATASET_CLEANER_SOURCE_PREFIX = "datasetCleanerSource";
+
+  /**
+   * This config holds a list of configuration names. Each configuration name defines a job that can have scoped config.
+   *
+   * So the list "config1, config2" means that two jobs are configured.
+   *
+   * Then the config can be scoped like:
+   * datasetCleanerSource.config1.state.store.db.table=state_table1
+   * datasetCleanerSource.config2.state.store.db.table=state_table2
+   *
+   * Configuration fallback is as follows:
+   *   scoped config followed by config under datasetCleanerSource followed by general config
+   * So make sure that the scoped config name does not collide with valid configuration prefixes.
+   */
+  public static final String DATASET_CLEANER_CONFIGURATIONS = DATASET_CLEANER_SOURCE_PREFIX + ".configurations";
+
+  /**
+   * Create a work unit for each configuration defined or a single work unit if no configurations are defined
+   * @param state see {@link org.apache.gobblin.configuration.SourceState}
+   * @return list of workunits
+   */
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    List<WorkUnit> workUnits = Lists.newArrayList();
+    Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+    Config sourceConfig = ConfigUtils.getConfigOrEmpty(config, DATASET_CLEANER_SOURCE_PREFIX);
+    List<String> configurationNames = ConfigUtils.getStringList(config, DATASET_CLEANER_CONFIGURATIONS);
+
+    // use a dummy configuration name if none set
+    if (configurationNames.isEmpty()) {
+      configurationNames = ImmutableList.of("DummyConfig");
+    }
+
+    for (String configurationName: configurationNames) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+
+      // specific configuration prefixed by the configuration name has precedence over the source specific configuration
+      // and the source specific configuration has precedence over the general configuration
+      Config wuConfig = ConfigUtils.getConfigOrEmpty(sourceConfig, configurationName).withFallback(sourceConfig)
+          .withFallback(config);
+
+      workUnit.setProps(ConfigUtils.configToProperties(wuConfig), new Properties());
+      TaskUtils.setTaskFactoryClass(workUnit, DatasetCleanerTaskFactory.class);
+      workUnits.add(workUnit);
+    }
+
+    return workUnits;
+  }
+
+
+  @Override
+  public Extractor<Object, Object> getExtractor(WorkUnitState state) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void shutdown(SourceState state) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
new file mode 100644
index 0000000..b46e17c
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.retention;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.data.management.retention.DatasetCleaner;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+
+
+/**
+ * A task that runs a DatasetCleaner job.
+ */
+public class DatasetCleanerTask extends BaseAbstractTask {
+
+  private static final String JOB_CONFIGURATION_PREFIX = "datasetCleaner";
+
+  private final TaskContext taskContext;
+
+  public DatasetCleanerTask(TaskContext taskContext) {
+    super(taskContext);
+    this.taskContext = taskContext;
+  }
+
+  @Override
+  public void run() {
+    try {
+      DatasetCleaner datasetCleaner = new DatasetCleaner(FileSystem.get(new Configuration()),
+          this.taskContext.getTaskState().getProperties());
+      datasetCleaner.clean();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java
new file mode 100644
index 0000000..f1e36e9
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.retention;
+
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+
+
+/**
+ * A {@link TaskFactory} that creates a {@link DatasetCleanerTask}. There is no data publish phase for this task, so this
+ * factory uses a {@link NoopPublisher}.
+ */
+public class DatasetCleanerTaskFactory implements TaskFactory {
+
+  @Override
+  public TaskIFace createTask(TaskContext taskContext) {
+    return new DatasetCleanerTask(taskContext);
+  }
+
+  @Override
+  public DataPublisher createDataPublisher(JobState.DatasetState datasetState) {
+    return new NoopPublisher(datasetState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java
new file mode 100644
index 0000000..fb7316e
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention.source;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTaskFactory;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+public class DatasetCleanerSourceTest {
+  private static final String KEY1 = "key1";
+  private static final String KEY2 = "key2";
+  private static final String VALUE1 = "value1";
+  private static final String VALUE2 = "value2";
+  private static final String BAD_VALUE = "bad_value";
+  private static final String TASK_FACTORY_KEY = "org.apache.gobblin.runtime.taskFactoryClass";
+
+  private String getSourcePrefixed(String string) {
+    return DatasetCleanerSource.DATASET_CLEANER_SOURCE_PREFIX + "." + string;
+  }
+
+  private String getConfigPrefixed(String configName, String string) {
+    return getSourcePrefixed(configName + "." + string);
+  }
+
+  @Test
+  public void testSingleConfig() {
+    DatasetCleanerSource source = new DatasetCleanerSource();
+    SourceState sourceState = new SourceState();
+    Properties props = new Properties();
+
+
+    props.put(KEY1, VALUE1);
+    props.put(KEY2, VALUE2);
+
+    sourceState.setProps(props, new Properties());
+
+    List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+
+    Assert.assertEquals(workUnits.size(), 1);
+    Assert.assertEquals(workUnits.get(0).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName());
+    Assert.assertEquals(workUnits.get(0).getProp(KEY1), VALUE1);
+    Assert.assertEquals(workUnits.get(0).getProp(KEY2), VALUE2);
+  }
+
+  @Test
+  public void testMultipleConfig() {
+    DatasetCleanerSource source = new DatasetCleanerSource();
+    SourceState sourceState = new SourceState();
+    Properties props = new Properties();
+
+    props.put(DatasetCleanerSource.DATASET_CLEANER_CONFIGURATIONS, "config1, config2");
+
+    // test that config scoped config overrides source and base config
+    props.put(KEY1, BAD_VALUE);
+    props.put(getSourcePrefixed(KEY1), BAD_VALUE);
+    props.put(getConfigPrefixed("config1", KEY1), VALUE1);
+
+    // Test that source scoped config overrides base config
+    props.put(KEY2, BAD_VALUE);
+    props.put(getSourcePrefixed(KEY2), VALUE2);
+
+    sourceState.setProps(props, new Properties());
+
+    List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+
+    Assert.assertEquals(workUnits.size(), 2);
+    Assert.assertEquals(workUnits.get(0).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName());
+    Assert.assertEquals(workUnits.get(1).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName());
+
+    Assert.assertEquals(workUnits.get(0).getProp(KEY1), VALUE1);
+    Assert.assertEquals(workUnits.get(0).getProp(KEY2), VALUE2);
+
+    Assert.assertEquals(workUnits.get(1).getProp(KEY1), BAD_VALUE);
+    Assert.assertEquals(workUnits.get(1).getProp(KEY2), VALUE2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-example/src/main/resources/mysql-state-store-retention.pull
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/mysql-state-store-retention.pull b/gobblin-example/src/main/resources/mysql-state-store-retention.pull
new file mode 100644
index 0000000..eb24848
--- /dev/null
+++ b/gobblin-example/src/main/resources/mysql-state-store-retention.pull
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+job.name=state_cleanup
+job.group=state
+
+gobblin.retention.dataset.finder.class=org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder
+
+gobblin.retention.selection.policy.class=org.apache.gobblin.data.management.policy.CombineSelectionPolicy
+gobblin.retention.selection.combine.operation=intersect
+gobblin.retention.selection.combine.policy.classes=org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy,org.apache.gobblin.data.management.policy.NewestKSelectionPolicy
+gobblin.retention.selection.timeBased.lookbackTime=18h
+gobblin.retention.selection.newestK.versionsNotSelected=4
+
+encrypt.key.loc=/export/home/app/clusterMasterKey
+state.store.type=mysql
+state.store.db.url=jdbc:mysql://localhost:3306/gobblin
+state.store.db.user=gobblin
+state.store.db.password=ENC(56Dk1V2N5gZh07BVHMzHag==)
+
+# The example configuration above is without a config namespace prefix.
+# If cleanup of multiple state stores in a single job is desired then set the following key:
+# datasetCleanerSource.configurations=config1,config2
+# and then prefix any config overrides with datasetCleanerSource.<config namespace>
+#
+# Ex: datasetCleanerSource.config1.state.store.db.user=gobblin2
+