You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 19:55:20 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-897] adds local
FS spec executor to write jobs to a local dir
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 27e318c [GOBBLIN-897] adds local FS spec executor to write jobs to a local dir
27e318c is described below
commit 27e318cb7f39198decb5665742984936bf3e0647
Author: William Lo <wl...@linkedin.com>
AuthorDate: Thu Nov 21 11:55:14 2019 -0800
[GOBBLIN-897] adds local FS spec executor to write jobs to a local dir
Closes #2755 from Will-Lo/add-local-fs-spec-
executor
---
conf/gobblin-as-service/application.conf | 18 ++--
.../spec_executorInstance/LocalFsSpecExecutor.java | 69 ++++++++++++
.../spec_executorInstance/LocalFsSpecProducer.java | 118 +++++++++++++++++++++
3 files changed, 198 insertions(+), 7 deletions(-)
diff --git a/conf/gobblin-as-service/application.conf b/conf/gobblin-as-service/application.conf
index 1bf7fea..8d5f694 100644
--- a/conf/gobblin-as-service/application.conf
+++ b/conf/gobblin-as-service/application.conf
@@ -21,24 +21,21 @@ fs.uri="file:///"
# Topology Catalog and Store
topologySpec.store.dir=${gobblin.service.work.dir}/topologySpecStore
-specStore.fs.dir=${gobblin.service.work.dir}/spec-store
# TopologySpec Factory
topologySpecFactory.topologyNames=localGobblinCluster
topologySpecFactory.localGobblinCluster.description="StandaloneClusterTopology"
topologySpecFactory.localGobblinCluster.version="1"
topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
-topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.service.SimpleKafkaSpecProducer"
-topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="externalSource:InternalSink"
-topologySpecFactory.localGobblinCluster.writer.kafka.topics="SimpleKafkaSpecExecutorInstanceTest"
-topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.bootstrap.servers="localhost:9092"
-topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.value.serializer="org.apache.kafka.common.serialization.ByteArraySerializer"
+topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
+topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
+topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
# Flow Catalog and Store
flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
# Template Catalog
-gobblin.service.templateCatalogs.fullyQualifiedPath="file:///tmp/templateCatalog"
+gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
# JobStatusMonitor
gobblin.service.jobStatusMonitor.enabled=false
@@ -46,5 +43,12 @@ gobblin.service.jobStatusMonitor.enabled=false
# FsJobStatusRetriever
fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
+# DagManager
+gobblin.service.dagManager.enabled=true
+gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
+gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
+gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir
+
# RestLI
gobblin.service.port=6956
+
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
new file mode 100644
index 0000000..4d49ed0
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.util.concurrent.Future;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.slf4j.Logger;
+
+/**
+ * An {@link SpecExecutor} implementation that keep {@link JobSpec} as a file to be consumed up by Gobblin Standalone
+ * Therefore there's no necessity to install {@link SpecConsumer} in this case.
+ */
+public class LocalFsSpecExecutor extends AbstractSpecExecutor {
+ // Communication mechanism components.
+ // Not specifying final for further extension based on this implementation.
+ private SpecProducer<Spec> specProducer;
+
+ public LocalFsSpecExecutor(Config config) {
+ this(config, Optional.absent());
+ }
+
+ public LocalFsSpecExecutor(Config config, Optional<Logger> log) {
+ super(config, log);
+ specProducer = new LocalFsSpecProducer(config);
+ }
+
+ @Override
+ public Future<String> getDescription() {
+ return new CompletedFuture("Local File System SpecExecutor", null);
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer(){
+ return new CompletedFuture(this.specProducer, null);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Nothing to do in the abstract implementation.
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Nothing to do in the abstract implementation.
+ }
+
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
new file mode 100644
index 0000000..0f96cd9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An implementation of {@link SpecProducer} that produces {@link JobSpec}s to the {@value #LOCAL_FS_PRODUCER_PATH_KEY}
+ */
+@Slf4j
+public class LocalFsSpecProducer implements SpecProducer<Spec> {
+ private String specProducerPath;
+ public static final String LOCAL_FS_PRODUCER_PATH_KEY = "gobblin.cluster.localSpecProducer.dir";
+
+ public LocalFsSpecProducer(Config config) {
+ this.specProducerPath = config.getString(LOCAL_FS_PRODUCER_PATH_KEY);
+ File parentDir = new File(specProducerPath);
+ if (!parentDir.exists()) {
+ if (parentDir.mkdirs()) {
+ log.info("Creating directory path at {}", this.specProducerPath);
+ } else {
+ throw new RuntimeException(String.format("Unable to create folder to write specs to at %s", this.specProducerPath));
+ }
+ }
+ }
+
+ /** Add a {@link Spec} for execution on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param addedSpec*/
+ @Override
+ public Future<?> addSpec(Spec addedSpec) {
+ return writeSpec(addedSpec, SpecExecutor.Verb.ADD);
+ }
+
+ /** Update a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param updatedSpec*/
+ @Override
+ public Future<?> updateSpec(Spec updatedSpec) {
+ return writeSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+ }
+
+ private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
+ if (spec instanceof JobSpec) {
+ URI specUri = spec.getUri();
+ // format the JobSpec to have file of <flowGroup>_<flowName>.job
+ String jobFileName = getJobFileName(specUri);
+ try (
+ FileOutputStream fStream = new FileOutputStream(this.specProducerPath + File.separatorChar + jobFileName);
+ ) {
+ ((JobSpec) spec).getConfigAsProperties().store(fStream, null);
+ log.info("Writing job {} to {}", jobFileName, this.specProducerPath);
+ return new CompletedFuture<>(Boolean.TRUE, null);
+ } catch (IOException e) {
+ log.error("Exception encountered when adding Spec {}", spec);
+ return new CompletedFuture<>(Boolean.TRUE, e);
+ }
+ } else {
+ throw new RuntimeException("Unsupported spec type " + spec.getClass());
+ }
+ }
+
+ /** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
+ * @param deletedSpecURI
+ * @param headers*/
+ @Override
+ public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+ String jobFileName = getJobFileName(deletedSpecURI);
+ File file = new File(jobFileName);
+ if (file.delete()) {
+ log.info("Deleted spec: {}", jobFileName);
+ return new CompletedFuture<>(Boolean.TRUE, null);
+ }
+ throw new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI));
+ }
+
+ /** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */
+ @Override
+ public Future<? extends List<Spec>> listSpecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ private String getJobFileName(URI specUri) {
+ String[] uriTokens = specUri.getPath().split("/");
+ return String.join("_", uriTokens) + ".job";
+ }
+
+}
\ No newline at end of file