You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/28 17:43:04 UTC

[gobblin] branch master updated: [GOBBLIN-1627] provide option to convert datanodes names (#3484)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new db27e51  [GOBBLIN-1627] provide option to convert datanodes names (#3484)
db27e51 is described below

commit db27e516e20973e8033338b32e2b2f9ca88ecc3e
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Mar 28 10:43:00 2022 -0700

    [GOBBLIN-1627] provide option to convert datanodes names (#3484)
    
    * provide option to convert datanodes names
    address review comments
    
    * address review comments
    
    * address review comments
    
    * address review comment
---
 .../service/modules/flow/MultiHopFlowCompiler.java | 19 +++++++-
 .../service/modules/flowgraph/BaseFlowGraph.java   | 11 ++++-
 .../flowgraph/pathfinder/AbstractPathFinder.java   | 21 +++++++--
 .../flowgraph/pathfinder/BFSPathFinder.java        |  7 ++-
 .../pathfinder/AbstractPathFinderTest.java         | 51 ++++++++++++++++++++++
 5 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 99580e9..5528c27 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -38,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ServiceManager;
@@ -89,6 +92,9 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   private DataMovementAuthorizer dataMovementAuthorizer;
 
+  // a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
+  public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
+
   public MultiHopFlowCompiler(Config config) {
     this(config, true);
   }
@@ -103,7 +109,18 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
-    this.flowGraph = new BaseFlowGraph();
+    Map<String, String> dataNodeAliasMap = new HashMap<>();
+
+    try {
+      dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
+          ? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
+          : new HashMap<>();
+    } catch (RuntimeException e) {
+      MultiHopFlowCompiler.log.warn("Exception reading data node alias map, ignoring it.", e);
+    }
+
+    this.flowGraph = new BaseFlowGraph(dataNodeAliasMap);
+
     Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = Optional.absent();
     if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
         && StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 83a9a58..9f5ef02 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -50,6 +50,15 @@ public class BaseFlowGraph implements FlowGraph {
   private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
   private Map<String, DataNode> dataNodeMap = new HashMap<>();
   private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+  private Map<String, String> dataNodeAliasMap;
+
+  public BaseFlowGraph() {
+    this(new HashMap<>());
+  }
+
+  public BaseFlowGraph(Map<String, String> dataNodeAliasMap) {
+    this.dataNodeAliasMap = dataNodeAliasMap;
+  }
 
   /**
    * Lookup a node by its identifier.
@@ -238,7 +247,7 @@ public class BaseFlowGraph implements FlowGraph {
           .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
               FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
       PathFinder pathFinder =
-          (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec);
+          (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec, dataNodeAliasMap);
       return pathFinder.findPath();
     } finally {
       rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 4dc71f4..2e72618 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -26,6 +26,7 @@ import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +79,11 @@ public abstract class AbstractPathFinder implements PathFinder {
   protected FlowSpec flowSpec;
   protected Config flowConfig;
 
-  AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+  AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException {
+    this(flowGraph, flowSpec, new HashMap<>());
+  }
+
+  AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec, Map<String, String> dataNodeAliasMap)
       throws ReflectiveOperationException {
     this.flowGraph = flowGraph;
     this.flowSpec = flowSpec;
@@ -86,9 +91,9 @@ public abstract class AbstractPathFinder implements PathFinder {
     this.flowConfig = flowSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
 
     //Get src/dest DataNodes from the flow config
-    String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+    String srcNodeId = getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
+    List<String> destNodeIds = getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
 
-    List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     this.srcNode = this.flowGraph.getNode(srcNodeId);
     Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
     for (String destNodeId : destNodeIds) {
@@ -155,6 +160,16 @@ public abstract class AbstractPathFinder implements PathFinder {
     this.destDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(destDatasetDescriptorConfig);
   }
 
+  static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+    List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
+    return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
+  }
+
+  private static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+    String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
+    return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
+  }
+
   public static Config getDefaultConfig(DataNode dataNode) {
     Config defaultConfig = ConfigFactory.empty();
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index fa46efd..5435884 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -58,7 +59,11 @@ public class BFSPathFinder extends AbstractPathFinder {
    */
   public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
       throws ReflectiveOperationException {
-    super(flowGraph, flowSpec);
+    this(flowGraph, flowSpec, new HashMap<>());
+  }
+
+  public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec, Map<String, String> dataNodeAliasMap) throws ReflectiveOperationException {
+    super(flowGraph, flowSpec, dataNodeAliasMap);
   }
 
   /**
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
new file mode 100644
index 0000000..c746372
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+public class AbstractPathFinderTest {
+
+  @Test
+  public void convertDataNodesTest() {
+    Config flowConfig = ConfigFactory.empty()
+        .withValue(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ConfigValueFactory.fromAnyRef("node1-alpha,node2"));
+    Map<String, String> dataNodeAliasMap = new HashMap<>();
+    dataNodeAliasMap.put("node1-alpha", "node1");
+    dataNodeAliasMap.put("node1-beta", "node1");
+    dataNodeAliasMap.put("node3-alpha", "node3");
+    dataNodeAliasMap.put("node1-beta", "node3");
+
+    List<String> dataNodes = AbstractPathFinder.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+    Assert.assertEquals(dataNodes.size(), 2);
+    Assert.assertTrue(dataNodes.contains("node1"));
+    Assert.assertTrue(dataNodes.contains("node2"));
+  }
+}