You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 16:15:35 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1115] Add flow level data movement authorization in gaas

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eba00a3  [GOBBLIN-1115] Add flow level data movement authorization in gaas
eba00a3 is described below

commit eba00a30bb02d06831d4374887b457a37acece40
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Apr 23 09:15:27 2020 -0700

    [GOBBLIN-1115] Add flow level data movement authorization in gaas
    
    Closes #2955 from jack-moseley/data-authorization
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  1 +
 .../apache/gobblin/metrics/ServiceMetricNames.java |  1 +
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  4 +++
 .../modules/flow/DataMovementAuthorizer.java       | 32 +++++++++++++++++++
 .../service/modules/flow/MultiHopFlowCompiler.java | 36 ++++++++++++++++++++--
 .../modules/flow/NoopDataMovementAuthorizer.java   | 35 +++++++++++++++++++++
 6 files changed, 107 insertions(+), 2 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index e589b4a..9b458ee 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -83,6 +83,7 @@ public class ServiceConfigKeys {
   public static final String SPEC_EXECUTOR_KEY = "specExecutorInstance.class";
   public static final String EDGE_SECURITY_KEY = "edge.secured";
 
+  public static final String DATA_MOVEMENT_AUTHORIZER_CLASS = "dataMovementAuthorizer.class";
 
   // Template Catalog Keys
   public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index bf69c8b..9fe3515 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -23,6 +23,7 @@ public class ServiceMetricNames {
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
   public static final String FLOW_COMPILATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.failed";
   public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.time";
+  public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.dataAuthorization.time";
 
   // Flow Orchestration Meters and Timer
   public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 257ad3d..b42de01 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -82,6 +82,8 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
   @Getter
   protected Optional<Timer> flowCompilationTimer;
   @Getter
+  protected Optional<Timer> dataAuthorizationTimer;
+  @Getter
   @Setter
   protected boolean active;
 
@@ -104,12 +106,14 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
       this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
       this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
       this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
+      this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
     }
     else {
       this.metricContext = null;
       this.flowCompilationSuccessFulMeter = Optional.absent();
       this.flowCompilationFailedMeter = Optional.absent();
       this.flowCompilationTimer = Optional.absent();
+      this.dataAuthorizationTimer = Optional.absent();
     }
 
     this.topologySpecMap = Maps.newConcurrentMap();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java
new file mode 100644
index 0000000..96a8121
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/DataMovementAuthorizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+
+
+/**
+ * Class that is called each time a flow is compiled to decide whether the data movement is authorized or not.
+ */
+public interface DataMovementAuthorizer {
+  /**
+   * Return true if the data movement is authorized given the flowspec and source/destination data node.
+   */
+  public boolean isMovementAuthorized(FlowSpec flowSpec, DataNode sourceNode, DataNode destNode);
+}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index dbc01ba..9b1ec16 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.flow;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,8 +27,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
@@ -55,11 +58,13 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -81,6 +86,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
+  private DataMovementAuthorizer dataMovementAuthorizer;
+
   public MultiHopFlowCompiler(Config config) {
     this(config, true);
   }
@@ -113,6 +120,15 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
       gitFlowGraphConfig = this.config
           .withValue(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.ENCRYPT_KEY_LOC, config.getValue(ConfigurationKeys.ENCRYPT_KEY_LOC));
     }
+
+    try {
+      String dataMovementAuthorizerClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.DATA_MOVEMENT_AUTHORIZER_CLASS,
+          NoopDataMovementAuthorizer.class.getCanonicalName());
+      this.dataMovementAuthorizer = (DataMovementAuthorizer) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(DataMovementAuthorizer.class).resolve(dataMovementAuthorizerClassName)), this.config);
+    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
     this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig, flowTemplateCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
     this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor, flowTemplateCatalog.get()));
     addShutdownHook();
@@ -129,6 +145,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
   MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
     super(config, Optional.absent(), true);
     this.flowGraph = flowGraph;
+    this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
   }
 
   /**
@@ -169,8 +186,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
     FlowSpec flowSpec = (FlowSpec) spec;
     String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
-    String destination =
-        ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+    String destination = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+
+    DataNode sourceNode = this.flowGraph.getNode(source);
+    List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
+
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
 
     List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
@@ -178,6 +199,17 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     try {
       this.rwLock.readLock().lock();
       for (FlowSpec datasetFlowSpec : flowSpecs) {
+        for (DataNode destNode : destNodes) {
+          long authStartTime = System.nanoTime();
+          boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
+          Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
+          if (!authorized) {
+            log.error(String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
+                flowSpec.getUri().toString(), source, destination));
+            return null;
+          }
+        }
+
         //Compute the path from source to destination.
         FlowGraphPath flowGraphPath = flowGraph.findPath(datasetFlowSpec);
         if (flowGraphPath != null) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java
new file mode 100644
index 0000000..a233f02
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/NoopDataMovementAuthorizer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+
+
+/**
+ * {@link DataMovementAuthorizer} that always returns true.
+ */
+public class NoopDataMovementAuthorizer implements DataMovementAuthorizer {
+  public NoopDataMovementAuthorizer(Config config) {}
+
+  public boolean isMovementAuthorized(FlowSpec flowSpec, DataNode sourceNode, DataNode destNode) {
+    return true;
+  }
+}
\ No newline at end of file