You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 16:15:35 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1115] Add flow
level data movement authorization in gaas
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eba00a3 [GOBBLIN-1115] Add flow level data movement authorization in gaas
eba00a3 is described below
commit eba00a30bb02d06831d4374887b457a37acece40
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Apr 23 09:15:27 2020 -0700
[GOBBLIN-1115] Add flow level data movement authorization in gaas
Closes #2955 from jack-moseley/data-authorization
---
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../apache/gobblin/metrics/ServiceMetricNames.java | 1 +
.../modules/flow/BaseFlowToJobSpecCompiler.java | 4 +++
.../modules/flow/DataMovementAuthorizer.java | 32 +++++++++++++++++++
.../service/modules/flow/MultiHopFlowCompiler.java | 36 ++++++++++++++++++++--
.../modules/flow/NoopDataMovementAuthorizer.java | 35 +++++++++++++++++++++
6 files changed, 107 insertions(+), 2 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index e589b4a..9b458ee 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -83,6 +83,7 @@ public class ServiceConfigKeys {
public static final String SPEC_EXECUTOR_KEY = "specExecutorInstance.class";
public static final String EDGE_SECURITY_KEY = "edge.secured";
+ public static final String DATA_MOVEMENT_AUTHORIZER_CLASS = "dataMovementAuthorizer.class";
// Template Catalog Keys
public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index bf69c8b..9fe3515 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -23,6 +23,7 @@ public class ServiceMetricNames {
public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
public static final String FLOW_COMPILATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.failed";
public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.time";
+ public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.dataAuthorization.time";
// Flow Orchestration Meters and Timer
public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 257ad3d..b42de01 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -82,6 +82,8 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
@Getter
protected Optional<Timer> flowCompilationTimer;
@Getter
+ protected Optional<Timer> dataAuthorizationTimer;
+ @Getter
@Setter
protected boolean active;
@@ -104,12 +106,14 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
+ this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
}
else {
this.metricContext = null;
this.flowCompilationSuccessFulMeter = Optional.absent();
this.flowCompilationFailedMeter = Optional.absent();
this.flowCompilationTimer = Optional.absent();
+ this.dataAuthorizationTimer = Optional.absent();
}
this.topologySpecMap = Maps.newConcurrentMap();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java
new file mode 100644
index 0000000..96a8121
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+
+
+/**
+ * Class that is called each time a flow is compiled to decide whether the data movement is authorized or not.
+ */
+public interface DataMovementAuthorizer {
+ /**
+ * Return true if the data movement is authorized given the flowspec and source/destination data node.
+ */
+ public boolean isMovementAuthorized(FlowSpec flowSpec, DataNode sourceNode, DataNode destNode);
+}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index dbc01ba..9b1ec16 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.flow;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@ -26,8 +27,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -55,11 +58,13 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
@@ -81,6 +86,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ private DataMovementAuthorizer dataMovementAuthorizer;
+
public MultiHopFlowCompiler(Config config) {
this(config, true);
}
@@ -113,6 +120,15 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
gitFlowGraphConfig = this.config
.withValue(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.ENCRYPT_KEY_LOC, config.getValue(ConfigurationKeys.ENCRYPT_KEY_LOC));
}
+
+ try {
+ String dataMovementAuthorizerClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.DATA_MOVEMENT_AUTHORIZER_CLASS,
+ NoopDataMovementAuthorizer.class.getCanonicalName());
+ this.dataMovementAuthorizer = (DataMovementAuthorizer) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(DataMovementAuthorizer.class).resolve(dataMovementAuthorizerClassName)), this.config);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig, flowTemplateCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor, flowTemplateCatalog.get()));
addShutdownHook();
@@ -129,6 +145,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
super(config, Optional.absent(), true);
this.flowGraph = flowGraph;
+ this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
}
/**
@@ -169,8 +186,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
FlowSpec flowSpec = (FlowSpec) spec;
String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
- String destination =
- ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+ String destination = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+
+ DataNode sourceNode = this.flowGraph.getNode(source);
+ List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
+
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
@@ -178,6 +199,17 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
try {
this.rwLock.readLock().lock();
for (FlowSpec datasetFlowSpec : flowSpecs) {
+ for (DataNode destNode : destNodes) {
+ long authStartTime = System.nanoTime();
+ boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
+ Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
+ if (!authorized) {
+ log.error(String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
+ flowSpec.getUri().toString(), source, destination));
+ return null;
+ }
+ }
+
//Compute the path from source to destination.
FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
if (flowGraphPath != null) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java
new file mode 100644
index 0000000..a233f02
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+
+
+/**
+ * {@link DataMovementAuthorizer} that always returns true.
+ */
+public class NoopDataMovementAuthorizer implements DataMovementAuthorizer {
+ public NoopDataMovementAuthorizer(Config config) {}
+
+ public boolean isMovementAuthorized(FlowSpec flowSpec, DataNode sourceNode, DataNode destNode) {
+ return true;
+ }
+}
\ No newline at end of file