You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/18 23:51:47 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-742] Implement a FileSystem based JobConfigurationManager.

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new aa45670  [GOBBLIN-742] Implement a FileSystem based JobConfigurationManager.
aa45670 is described below

commit aa45670a59188184b401c1177fbf8ce8f6d344dd
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 18 16:51:36 2019 -0700

    [GOBBLIN-742] Implement a FileSystem based JobConfigurationManager.
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-742
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    This task aims to implement a FileSystem based
    JobConfigurationManager. The manager has an
    underlying SpecConsumer which reads specs from a
    config specified path on HDFS and persists them to
    a JobCatalog. On successful consumption of the
    spec by the manager , it is deleted from the
    original path.  So on the next fetch, only new
    specs will be picked up by the
    JobConfigurationManager.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    FsScheduledJobConfigurationManagerTest
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    GOBBLIN-742: Implement a FileSystem based
    JobConfigurationManager.
    
    GOBBLIN-742: Address reviewer feedback.
    
    Closes #2608 from sv2000/fsSpecConsumer
---
 .../apache/gobblin/runtime/api/SpecConsumer.java   |   7 +
 .../FsScheduledJobConfigurationManager.java        |  82 +++++++++++
 .../cluster/ScheduledJobConfigurationManager.java  |   4 +-
 .../FsScheduledJobConfigurationManagerTest.java    | 155 +++++++++++++++++++++
 .../apache/gobblin/runtime/api/FsSpecConsumer.java | 127 +++++++++++++++++
 5 files changed, 373 insertions(+), 2 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
index 6dc16f4..6e5cff5 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime.api;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Future;
 import org.apache.commons.lang3.tuple.Pair;
@@ -32,4 +33,10 @@ public interface SpecConsumer<V>  {
   /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
   Future<? extends List<Pair<SpecExecutor.Verb, V>>> changedSpecs();
 
+  /**
+   * A commit method to allow the consumer to checkpoint state on successful consumption of a {@link Spec}.
+   */
+  default void commit(Spec spec) throws IOException {
+    return;
+  }
 }
\ No newline at end of file
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
new file mode 100644
index 0000000..21db2ff
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
+
+/**
+ * A {@link ScheduledJobConfigurationManager} that reads {@link JobSpec}s from a source path on a
+ * {@link org.apache.hadoop.fs.FileSystem} and adds them to a {@link org.apache.gobblin.runtime.api.JobCatalog}.
+ * The {@link FsScheduledJobConfigurationManager} has an underlying {@link FsSpecConsumer} that reads the {@link JobSpec}s
+ * from the filesystem and once the JobSpecs have been added to the {@link org.apache.gobblin.runtime.api.JobCatalog},
+ * the consumer deletes the specs from the source path.
+ */
+@Slf4j
+public class FsScheduledJobConfigurationManager extends ScheduledJobConfigurationManager {
+  private final MutableJobCatalog _jobCatalog;
+
+  public FsScheduledJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog) {
+    super(eventBus, config);
+    this._jobCatalog = jobCatalog;
+  }
+
+  @Override
+  protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
+    List<Pair<SpecExecutor.Verb, Spec>> jobSpecs =
+        (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
+
+    for (Pair<SpecExecutor.Verb, Spec> entry : jobSpecs) {
+      Spec spec = entry.getValue();
+      SpecExecutor.Verb verb = entry.getKey();
+      if (verb.equals(SpecExecutor.Verb.ADD) || verb.equals(SpecExecutor.Verb.UPDATE)) {
+        // Handle addition
+        JobSpec jobSpec = (JobSpec) spec;
+        this._jobCatalog.put(jobSpec);
+        postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
+      } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
+        // Handle delete
+        this._jobCatalog.remove(spec.getUri());
+        postDeleteJobConfigArrival(spec.getUri().toString(), new Properties());
+      }
+
+      try {
+        //Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
+        //SpecConsumer can delete the JobSpec.
+        this._specConsumer.commit(spec);
+      } catch (IOException e) {
+        log.error("Error when committing to FsSpecConsumer: ", e);
+      }
+    }
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index 00c7b0d..e311507 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -59,7 +59,7 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
 
   private final ScheduledExecutorService fetchJobSpecExecutor;
 
-  private final SpecConsumer _specConsumer;
+  protected final SpecConsumer _specConsumer;
 
   private final ClassAliasResolver<SpecConsumer> aliasResolver;
 
@@ -115,7 +115,7 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
    * @throws ExecutionException
    * @throws InterruptedException
    */
-  private void fetchJobSpecs() throws ExecutionException, InterruptedException {
+  protected void fetchJobSpecs() throws ExecutionException, InterruptedException {
     List<Pair<SpecExecutor.Verb, Spec>> changesSpecs =
         (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
 
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
new file mode 100644
index 0000000..09c0c86
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+
+@Slf4j
+public class FsScheduledJobConfigurationManagerTest {
+  private MutableJobCatalog _jobCatalog;
+  private FsScheduledJobConfigurationManager jobConfigurationManager;
+
+  private String jobConfDir = "/tmp/" + this.getClass().getSimpleName() + "/jobCatalog";
+  private String fsSpecConsumerPathString = "/tmp/fsJobConfigManagerTest";
+  private String jobSpecUriString = "testJobSpec";
+
+  private FileSystem fs;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    this.fs = FileSystem.getLocal(new Configuration(false));
+    Path jobConfDirPath = new Path(jobConfDir);
+    if (!this.fs.exists(jobConfDirPath)) {
+      this.fs.mkdirs(jobConfDirPath);
+    }
+
+    EventBus eventBus = new EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir));
+    this._jobCatalog = new NonObservingFSJobCatalog(config);
+    ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
+
+    Config jobConfigurationManagerConfig = ConfigFactory.empty()
+        .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
+        .withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
+    jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, jobConfigurationManagerConfig, this._jobCatalog);
+  }
+
+  private void addJobSpec(String jobSpecName, String version, String verb) throws IOException {
+    Map<String, String> metadataMap = new HashMap<>();
+    metadataMap.put(FsSpecConsumer.VERB_KEY, verb);
+
+    AvroJobSpec jobSpec = AvroJobSpec.newBuilder().
+        setUri(Files.getNameWithoutExtension(jobSpecName)).
+        setProperties(new HashMap<>()).
+        setTemplateUri("FS:///").
+        setDescription("test").
+        setVersion(version).
+        setMetadata(metadataMap).build();
+
+    DatumWriter<AvroJobSpec> datumWriter = new SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
+    DataFileWriter<AvroJobSpec> dataFileWriter = new DataFileWriter<>(datumWriter);
+
+    Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString, jobSpecName);
+    FileSystem fs = fsSpecConsumerPath.getFileSystem(new Configuration());
+    OutputStream out = fs.create(fsSpecConsumerPath);
+
+    dataFileWriter.create(AvroJobSpec.SCHEMA$, out);
+    dataFileWriter.append(jobSpec);
+    dataFileWriter.close();
+  }
+
+  @Test (expectedExceptions = {JobSpecNotFoundException.class})
+  public void testFetchJobSpecs() throws ExecutionException, InterruptedException, URISyntaxException, JobSpecNotFoundException, IOException {
+    //Test adding a JobSpec
+    String verb1 = SpecExecutor.Verb.ADD.name();
+    String version1 = "1";
+    addJobSpec(jobSpecUriString, version1, verb1);
+    this.jobConfigurationManager.fetchJobSpecs();
+    JobSpec jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+    Assert.assertTrue(jobSpec != null);
+    Assert.assertTrue(jobSpec.getVersion().equals(version1));
+    Assert.assertTrue(jobSpec.getUri().getPath().equals(jobSpecUriString));
+    //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+    Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
+    Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+
+    //Test that the updated JobSpec has been added to the JobCatalog.
+    String verb2 = SpecExecutor.Verb.UPDATE.name();
+    String version2 = "2";
+    addJobSpec(jobSpecUriString, version2, verb2);
+    this.jobConfigurationManager.fetchJobSpecs();
+    jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+    Assert.assertTrue(jobSpec != null);
+    Assert.assertTrue(jobSpec.getVersion().equals(version2));
+    //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+    Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+
+    //Test that the JobSpec has been deleted from the JobCatalog.
+    String verb3 = SpecExecutor.Verb.DELETE.name();
+    addJobSpec(jobSpecUriString, version2, verb3);
+    this.jobConfigurationManager.fetchJobSpecs();
+    Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+    this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
+    if (fs.exists(fsSpecConsumerPath)) {
+      fs.delete(fsSpecConsumerPath, true);
+    }
+    Path jobCatalogPath = new Path(jobConfDir);
+    if (fs.exists(jobCatalogPath)) {
+      fs.delete(jobCatalogPath, true);
+    }
+  }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
new file mode 100644
index 0000000..58a2f99
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.CompletedFuture;
+
+@Slf4j
+public class FsSpecConsumer implements SpecConsumer<Spec> {
+  public static final String SPEC_PATH_KEY = "gobblin.cluster.specConsumer.path";
+  public static final String VERB_KEY = "Verb";
+
+  private final Path specDirPath;
+  private final FileSystem fs;
+  private Map<URI, Path> specToPathMap = new HashMap<>();
+
+
+  public FsSpecConsumer(Config config) {
+    this.specDirPath = new Path(config.getString(SPEC_PATH_KEY));
+    try {
+      this.fs = this.specDirPath.getFileSystem(new Configuration());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to detect spec directory file system: " + e, e);
+    }
+  }
+
+  /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
+  @Override
+  public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+    List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
+    FileStatus[] fileStatuses;
+    try {
+      fileStatuses = this.fs.listStatus(this.specDirPath);
+    } catch (IOException e) {
+      log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
+      return null;
+    }
+
+    for (FileStatus fileStatus : fileStatuses) {
+      DataFileReader<AvroJobSpec> dataFileReader;
+      try {
+        dataFileReader = new DataFileReader<>(new FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
+      } catch (IOException e) {
+        log.error("Error creating DataFileReader for: {}", fileStatus.getPath().toString(), e);
+        continue;
+      }
+
+      AvroJobSpec avroJobSpec = null;
+      while (dataFileReader.hasNext()) {
+        avroJobSpec = dataFileReader.next();
+        break;
+      }
+
+      if (avroJobSpec != null) {
+        JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
+        Properties props = new Properties();
+        props.putAll(avroJobSpec.getProperties());
+        jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri()).withVersion(avroJobSpec.getVersion())
+            .withDescription(avroJobSpec.getDescription()).withConfigAsProperties(props);
+
+        try {
+          if (!avroJobSpec.getTemplateUri().isEmpty()) {
+            jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
+          }
+        } catch (URISyntaxException u) {
+          log.error("Error building a job spec: ", u);
+          continue;
+        }
+
+        String verbName = avroJobSpec.getMetadata().get(VERB_KEY);
+        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
+        JobSpec jobSpec = jobSpecBuilder.build();
+        specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
+        this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
+      }
+    }
+    return new CompletedFuture<>(specList, null);
+  }
+
+
+  @Override
+  public void commit(Spec spec) throws IOException {
+    Path path = this.specToPathMap.get(spec.getUri());
+    if (path != null) {
+      this.fs.delete(path, false);
+    }
+  }
+}