You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/19 22:02:18 UTC

[gobblin] branch master updated: use data node aliases to figure out data node names before using DMAS (#3493)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c2f8d43f use data node aliases to figure out data node names before using DMAS (#3493)
6c2f8d43f is described below

commit 6c2f8d43f514723f036be2b63b55fcb8394aa4d6
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Apr 19 15:02:11 2022 -0700

    use data node aliases to figure out data node names before using DMAS (#3493)
---
 .../service/modules/flow/MultiHopFlowCompiler.java      | 13 +++++++------
 .../flowgraph/pathfinder/AbstractPathFinder.java        | 15 +++------------
 .../gobblin/service/modules/restli/FlowConfigUtils.java | 17 +++++++++++++++++
 .../flowgraph/pathfinder/AbstractPathFinderTest.java    |  3 ++-
 4 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 5528c2761..d76b9123a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -66,6 +66,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
 import org.apache.gobblin.util.ClassAliasResolver;
@@ -92,6 +93,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   private DataMovementAuthorizer dataMovementAuthorizer;
 
+  private Map<String, String> dataNodeAliasMap = new HashMap<>();
+
   // a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
   public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
 
@@ -109,10 +112,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
-    Map<String, String> dataNodeAliasMap = new HashMap<>();
-
     try {
-      dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
+      this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
           ? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
           : new HashMap<>();
     } catch (RuntimeException e) {
@@ -203,15 +204,15 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
     long startTime = System.nanoTime();
 
     FlowSpec flowSpec = (FlowSpec) spec;
-    String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
-    String destination = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+    String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
+    String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
 
     DataNode sourceNode = this.flowGraph.getNode(source);
     if (sourceNode == null) {
       flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
       return null;
     }
-    List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+    List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
     List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
     if (destNodes.contains(null)) {
       flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 2e7261828..0bd19b538 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -91,8 +92,8 @@ public abstract class AbstractPathFinder implements PathFinder {
     this.flowConfig = flowSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
 
     //Get src/dest DataNodes from the flow config
-    String srcNodeId = getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
-    List<String> destNodeIds = getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+    String srcNodeId = FlowConfigUtils.getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
+    List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
 
     this.srcNode = this.flowGraph.getNode(srcNodeId);
     Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
@@ -160,16 +161,6 @@ public abstract class AbstractPathFinder implements PathFinder {
     this.destDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(destDatasetDescriptorConfig);
   }
 
-  static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
-    List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
-    return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
-  }
-
-  private static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
-    String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
-    return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
-  }
-
   public static Config getDefaultConfig(DataNode dataNode) {
     Config defaultConfig = ConfigFactory.empty();
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
index 3af1cf25e..a179ae475 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
@@ -18,14 +18,19 @@
 package org.apache.gobblin.service.modules.restli;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -101,4 +106,16 @@ public class FlowConfigUtils {
 
     return flowConfig;
   }
+
+  public static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+    List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
+    return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
+  }
+
+  public static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+    String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
+    return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
+  }
+
+
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
index c746372ba..964bdc103 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
 
 
 public class AbstractPathFinderTest {
@@ -43,7 +44,7 @@ public class AbstractPathFinderTest {
     dataNodeAliasMap.put("node3-alpha", "node3");
     dataNodeAliasMap.put("node1-beta", "node3");
 
-    List<String> dataNodes = AbstractPathFinder.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+    List<String> dataNodes = FlowConfigUtils.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
     Assert.assertEquals(dataNodes.size(), 2);
     Assert.assertTrue(dataNodes.contains("node1"));
     Assert.assertTrue(dataNodes.contains("node2"));