You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 19:55:20 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-897] adds local FS spec executor to write jobs to a local dir

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e318c  [GOBBLIN-897] adds local FS spec executor to write jobs to a local dir
27e318c is described below

commit 27e318cb7f39198decb5665742984936bf3e0647
Author: William Lo <wl...@linkedin.com>
AuthorDate: Thu Nov 21 11:55:14 2019 -0800

    [GOBBLIN-897] adds local FS spec executor to write jobs to a local dir
    
    Closes #2755 from Will-Lo/add-local-fs-spec-
    executor
---
 conf/gobblin-as-service/application.conf           |  18 ++--
 .../spec_executorInstance/LocalFsSpecExecutor.java |  69 ++++++++++++
 .../spec_executorInstance/LocalFsSpecProducer.java | 118 +++++++++++++++++++++
 3 files changed, 198 insertions(+), 7 deletions(-)

diff --git a/conf/gobblin-as-service/application.conf b/conf/gobblin-as-service/application.conf
index 1bf7fea..8d5f694 100644
--- a/conf/gobblin-as-service/application.conf
+++ b/conf/gobblin-as-service/application.conf
@@ -21,24 +21,21 @@ fs.uri="file:///"
 
 # Topology Catalog and Store
 topologySpec.store.dir=${gobblin.service.work.dir}/topologySpecStore
-specStore.fs.dir=${gobblin.service.work.dir}/spec-store
 
 # TopologySpec Factory
 topologySpecFactory.topologyNames=localGobblinCluster
 topologySpecFactory.localGobblinCluster.description="StandaloneClusterTopology"
 topologySpecFactory.localGobblinCluster.version="1"
 topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
-topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.service.SimpleKafkaSpecProducer"
-topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="externalSource:InternalSink"
-topologySpecFactory.localGobblinCluster.writer.kafka.topics="SimpleKafkaSpecExecutorInstanceTest"
-topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.bootstrap.servers="localhost:9092"
-topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.value.serializer="org.apache.kafka.common.serialization.ByteArraySerializer"
+topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
+topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
+topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
 
 # Flow Catalog and Store
 flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
 
 # Template Catalog
-gobblin.service.templateCatalogs.fullyQualifiedPath="file:///tmp/templateCatalog"
+gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
 
 # JobStatusMonitor
 gobblin.service.jobStatusMonitor.enabled=false
@@ -46,5 +43,12 @@ gobblin.service.jobStatusMonitor.enabled=false
 # FsJobStatusRetriever
 fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
 
+# DagManager
+gobblin.service.dagManager.enabled=true
+gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
+gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
+gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir
+
 # RestLI
 gobblin.service.port=6956
+
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
new file mode 100644
index 0000000..4d49ed0
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.util.concurrent.Future;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.slf4j.Logger;
+
+/**
+ * An {@link SpecExecutor} implementation that keep {@link JobSpec} as a file to be consumed up by Gobblin Standalone
+ * Therefore there's no necessity to install {@link SpecConsumer} in this case.
+ */
+public class LocalFsSpecExecutor extends AbstractSpecExecutor {
+  // Communication mechanism components.
+  // Not specifying final for further extension based on this implementation.
+  private SpecProducer<Spec> specProducer;
+
+  public LocalFsSpecExecutor(Config config) {
+    this(config, Optional.absent());
+  }
+
+  public LocalFsSpecExecutor(Config config, Optional<Logger> log) {
+    super(config, log);
+    specProducer = new LocalFsSpecProducer(config);
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture("Local File System SpecExecutor", null);
+  }
+
+  @Override
+  public Future<? extends SpecProducer> getProducer(){
+    return new CompletedFuture(this.specProducer, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // Nothing to do in the abstract implementation.
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // Nothing to do in the abstract implementation.
+  }
+
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
new file mode 100644
index 0000000..0f96cd9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link SpecProducer} that produces {@link JobSpec}s to the {@value #LOCAL_FS_PRODUCER_PATH_KEY}
+ */
+@Slf4j
+public class LocalFsSpecProducer implements SpecProducer<Spec> {
+  private String specProducerPath;
+  public static final String LOCAL_FS_PRODUCER_PATH_KEY = "gobblin.cluster.localSpecProducer.dir";
+
+  public LocalFsSpecProducer(Config config) {
+    this.specProducerPath = config.getString(LOCAL_FS_PRODUCER_PATH_KEY);
+    File parentDir = new File(specProducerPath);
+    if (!parentDir.exists()) {
+      if (parentDir.mkdirs()) {
+        log.info("Creating directory path at {}", this.specProducerPath);
+      } else {
+        throw new RuntimeException(String.format("Unable to create folder to write specs to at %s", this.specProducerPath));
+      }
+    }
+  }
+
+  /** Add a {@link Spec} for execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param addedSpec*/
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    return writeSpec(addedSpec, SpecExecutor.Verb.ADD);
+  }
+
+  /** Update a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param updatedSpec*/
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    return writeSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+  }
+
+  private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
+    if (spec instanceof JobSpec) {
+      URI specUri = spec.getUri();
+      // format the JobSpec to have file of <flowGroup>_<flowName>.job
+      String jobFileName = getJobFileName(specUri);
+      try (
+        FileOutputStream fStream = new FileOutputStream(this.specProducerPath + File.separatorChar + jobFileName);
+      ) {
+        ((JobSpec) spec).getConfigAsProperties().store(fStream, null);
+        log.info("Writing job {} to {}", jobFileName, this.specProducerPath);
+        return new CompletedFuture<>(Boolean.TRUE, null);
+      } catch (IOException e) {
+        log.error("Exception encountered when adding Spec {}", spec);
+        return new CompletedFuture<>(Boolean.TRUE, e);
+      }
+    } else {
+      throw new RuntimeException("Unsupported spec type " + spec.getClass());
+    }
+  }
+
+  /** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+   * @param deletedSpecURI
+   * @param headers*/
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+    String jobFileName = getJobFileName(deletedSpecURI);
+    File file = new File(jobFileName);
+    if (file.delete()) {
+      log.info("Deleted spec: {}", jobFileName);
+      return new CompletedFuture<>(Boolean.TRUE, null);
+    }
+    throw new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI));
+  }
+
+  /** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  private String getJobFileName(URI specUri) {
+    String[] uriTokens = specUri.getPath().split("/");
+    return String.join("_", uriTokens) + ".job";
+  }
+
+}
\ No newline at end of file