You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/18 23:51:47 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-742] Implement
a FileSystem based JobConfigurationManager.
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new aa45670 [GOBBLIN-742] Implement a FileSystem based JobConfigurationManager.
aa45670 is described below
commit aa45670a59188184b401c1177fbf8ce8f6d344dd
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 18 16:51:36 2019 -0700
[GOBBLIN-742] Implement a FileSystem based JobConfigurationManager.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [x] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-742
### Description
- [x] Here are some details about my PR, including
screenshots (if applicable):
This task aims to implement a FileSystem based
JobConfigurationManager. The manager has an
underlying SpecConsumer which reads specs from a
config specified path on HDFS and persists them to
a JobCatalog. On successful consumption of the
spec by the manager , it is deleted from the
original path. So on the next fetch, only new
specs will be picked up by the
JobConfigurationManager.
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
FsScheduledJobConfigurationManagerTest
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
GOBBLIN-742: Implement a FileSystem based
JobConfigurationManager.
GOBBLIN-742: Address reviewer feedback.
Closes #2608 from sv2000/fsSpecConsumer
---
.../apache/gobblin/runtime/api/SpecConsumer.java | 7 +
.../FsScheduledJobConfigurationManager.java | 82 +++++++++++
.../cluster/ScheduledJobConfigurationManager.java | 4 +-
.../FsScheduledJobConfigurationManagerTest.java | 155 +++++++++++++++++++++
.../apache/gobblin/runtime/api/FsSpecConsumer.java | 127 +++++++++++++++++
5 files changed, 373 insertions(+), 2 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
index 6dc16f4..6e5cff5 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.api;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;
@@ -32,4 +33,10 @@ public interface SpecConsumer<V> {
/** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
Future<? extends List<Pair<SpecExecutor.Verb, V>>> changedSpecs();
+ /**
+ * A commit method to allow the consumer to checkpoint state on successful consumption of a {@link Spec}.
+ */
+ default void commit(Spec spec) throws IOException {
+ return;
+ }
}
\ No newline at end of file
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
new file mode 100644
index 0000000..21db2ff
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
+
+/**
+ * A {@link ScheduledJobConfigurationManager} that reads {@link JobSpec}s from a source path on a
+ * {@link org.apache.hadoop.fs.FileSystem} and adds them to a {@link org.apache.gobblin.runtime.api.JobCatalog}.
+ * The {@link FsScheduledJobConfigurationManager} has an underlying {@link FsSpecConsumer} that reads the {@link JobSpec}s
+ * from the filesystem and once the JobSpecs have been added to the {@link org.apache.gobblin.runtime.api.JobCatalog},
+ * the consumer deletes the specs from the source path.
+ */
+@Slf4j
+public class FsScheduledJobConfigurationManager extends ScheduledJobConfigurationManager {
+ private final MutableJobCatalog _jobCatalog;
+
+ public FsScheduledJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog) {
+ super(eventBus, config);
+ this._jobCatalog = jobCatalog;
+ }
+
+ @Override
+ protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
+ List<Pair<SpecExecutor.Verb, Spec>> jobSpecs =
+ (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
+
+ for (Pair<SpecExecutor.Verb, Spec> entry : jobSpecs) {
+ Spec spec = entry.getValue();
+ SpecExecutor.Verb verb = entry.getKey();
+ if (verb.equals(SpecExecutor.Verb.ADD) || verb.equals(SpecExecutor.Verb.UPDATE)) {
+ // Handle addition
+ JobSpec jobSpec = (JobSpec) spec;
+ this._jobCatalog.put(jobSpec);
+ postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
+ } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
+ // Handle delete
+ this._jobCatalog.remove(spec.getUri());
+ postDeleteJobConfigArrival(spec.getUri().toString(), new Properties());
+ }
+
+ try {
+ //Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
+ //SpecConsumer can delete the JobSpec.
+ this._specConsumer.commit(spec);
+ } catch (IOException e) {
+ log.error("Error when committing to FsSpecConsumer: ", e);
+ }
+ }
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index 00c7b0d..e311507 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -59,7 +59,7 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
private final ScheduledExecutorService fetchJobSpecExecutor;
- private final SpecConsumer _specConsumer;
+ protected final SpecConsumer _specConsumer;
private final ClassAliasResolver<SpecConsumer> aliasResolver;
@@ -115,7 +115,7 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
* @throws ExecutionException
* @throws InterruptedException
*/
- private void fetchJobSpecs() throws ExecutionException, InterruptedException {
+ protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
List<Pair<SpecExecutor.Verb, Spec>> changesSpecs =
(List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
new file mode 100644
index 0000000..09c0c86
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+
+@Slf4j
+public class FsScheduledJobConfigurationManagerTest {
+ private MutableJobCatalog _jobCatalog;
+ private FsScheduledJobConfigurationManager jobConfigurationManager;
+
+ private String jobConfDir = "/tmp/" + this.getClass().getSimpleName() + "/jobCatalog";
+ private String fsSpecConsumerPathString = "/tmp/fsJobConfigManagerTest";
+ private String jobSpecUriString = "testJobSpec";
+
+ private FileSystem fs;
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ this.fs = FileSystem.getLocal(new Configuration(false));
+ Path jobConfDirPath = new Path(jobConfDir);
+ if (!this.fs.exists(jobConfDirPath)) {
+ this.fs.mkdirs(jobConfDirPath);
+ }
+
+ EventBus eventBus = new EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir));
+ this._jobCatalog = new NonObservingFSJobCatalog(config);
+ ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
+
+ Config jobConfigurationManagerConfig = ConfigFactory.empty()
+ .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
+ .withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
+ jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, jobConfigurationManagerConfig, this._jobCatalog);
+ }
+
+ private void addJobSpec(String jobSpecName, String version, String verb) throws IOException {
+ Map<String, String> metadataMap = new HashMap<>();
+ metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
+
+ AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
+ setUri(Files.getNameWithoutExtension(jobSpecName)).
+ setProperties(new HashMap<>()).
+ setTemplateUri("FS:///").
+ setDescription("test").
+ setVersion(version).
+ setMetadata(metadataMap).build();
+
+ DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+ DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+ Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString, jobSpecName);
+ FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
+ OutputStream out = fs.create(fsSpecConsumerPath);
+
+ dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+ dataFileWriter.append(jobSpec);
+ dataFileWriter.close();
+ }
+
+ @Test (expectedExceptions = {JobSpecNotFoundException.class})
+ public void testFetchJobSpecs() throws ExecutionException, InterruptedException, URISyntaxException, JobSpecNotFoundException, IOException {
+ //Test adding a JobSpec
+ String verb1 = SpecExecutor.Verb.ADD.name();
+ String version1 = "1";
+ addJobSpec(jobSpecUriString, version1, verb1);
+ this.jobConfigurationManager.fetchJobSpecs();
+ JobSpec jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+ Assert.assertTrue(jobSpec != null);
+ Assert.assertTrue(jobSpec.getVersion().equals(version1));
+ Assert.assertTrue(jobSpec.getUri().getPath().equals(jobSpecUriString));
+ //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+ Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
+ Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+
+ //Test that the updated JobSpec has been added to the JobCatalog.
+ String verb2 = SpecExecutor.Verb.UPDATE.name();
+ String version2 = "2";
+ addJobSpec(jobSpecUriString, version2, verb2);
+ this.jobConfigurationManager.fetchJobSpecs();
+ jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+ Assert.assertTrue(jobSpec != null);
+ Assert.assertTrue(jobSpec.getVersion().equals(version2));
+ //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+ Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+
+ //Test that the JobSpec has been deleted from the JobCatalog.
+ String verb3 = SpecExecutor.Verb.DELETE.name();
+ addJobSpec(jobSpecUriString, version2, verb3);
+ this.jobConfigurationManager.fetchJobSpecs();
+ Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+ this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
+ if (fs.exists(fsSpecConsumerPath)) {
+ fs.delete(fsSpecConsumerPath, true);
+ }
+ Path jobCatalogPath = new Path(jobConfDir);
+ if (fs.exists(jobCatalogPath)) {
+ fs.delete(jobCatalogPath, true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
new file mode 100644
index 0000000..58a2f99
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.CompletedFuture;
+
+@Slf4j
+public class FsSpecConsumer implements SpecConsumer<Spec> {
+ public static final String SPEC_PATH_KEY = "gobblin.cluster.specConsumer.path";
+ public static final String VERB_KEY = "Verb";
+
+ private final Path specDirPath;
+ private final FileSystem fs;
+ private Map<URI, Path> specToPathMap = new HashMap<>();
+
+
+ public FsSpecConsumer(Config config) {
+ this.specDirPath = new Path(config.getString(SPEC_PATH_KEY));
+ try {
+ this.fs = this.specDirPath.getFileSystem(new Configuration());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to detect spec directory file system: " + e, e);
+ }
+ }
+
+ /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
+ @Override
+ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+ List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
+ FileStatus[] fileStatuses;
+ try {
+ fileStatuses = this.fs.listStatus(this.specDirPath);
+ } catch (IOException e) {
+ log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
+ return null;
+ }
+
+ for (FileStatus fileStatus : fileStatuses) {
+ DataFileReader<AvroJobSpec> dataFileReader;
+ try {
+ dataFileReader = new DataFileReader<>(new FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
+ } catch (IOException e) {
+ log.error("Error creating DataFileReader for: {}", fileStatus.getPath().toString(), e);
+ continue;
+ }
+
+ AvroJobSpec avroJobSpec = null;
+ while (dataFileReader.hasNext()) {
+ avroJobSpec = dataFileReader.next();
+ break;
+ }
+
+ if (avroJobSpec != null) {
+ JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
+ Properties props = new Properties();
+ props.putAll(avroJobSpec.getProperties());
+ jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri()).withVersion(avroJobSpec.getVersion())
+ .withDescription(avroJobSpec.getDescription()).withConfigAsProperties(props);
+
+ try {
+ if (!avroJobSpec.getTemplateUri().isEmpty()) {
+ jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
+ }
+ } catch (URISyntaxException u) {
+ log.error("Error building a job spec: ", u);
+ continue;
+ }
+
+ String verbName = avroJobSpec.getMetadata().get(VERB_KEY);
+ SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
+ JobSpec jobSpec = jobSpecBuilder.build();
+ specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
+ this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
+ }
+ }
+ return new CompletedFuture<>(specList, null);
+ }
+
+
+ @Override
+ public void commit(Spec spec) throws IOException {
+ Path path = this.specToPathMap.get(spec.getUri());
+ if (path != null) {
+ this.fs.delete(path, false);
+ }
+ }
+}