You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/28 17:43:04 UTC
[gobblin] branch master updated: [GOBBLIN-1627] provide option to convert datanodes names (#3484)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new db27e51 [GOBBLIN-1627] provide option to convert datanodes names (#3484)
db27e51 is described below
commit db27e516e20973e8033338b32e2b2f9ca88ecc3e
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Mar 28 10:43:00 2022 -0700
[GOBBLIN-1627] provide option to convert datanodes names (#3484)
* provide option to convert datanodes names
address review comments
* address review comments
* address review comments
* address review comment
---
.../service/modules/flow/MultiHopFlowCompiler.java | 19 +++++++-
.../service/modules/flowgraph/BaseFlowGraph.java | 11 ++++-
.../flowgraph/pathfinder/AbstractPathFinder.java | 21 +++++++--
.../flowgraph/pathfinder/BFSPathFinder.java | 7 ++-
.../pathfinder/AbstractPathFinderTest.java | 51 ++++++++++++++++++++++
5 files changed, 103 insertions(+), 6 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 99580e9..5528c27 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
@@ -89,6 +92,9 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
private DataMovementAuthorizer dataMovementAuthorizer;
+ // a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
+ public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
+
public MultiHopFlowCompiler(Config config) {
this(config, true);
}
@@ -103,7 +109,18 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
- this.flowGraph = new BaseFlowGraph();
+ Map<String, String> dataNodeAliasMap = new HashMap<>();
+
+ try {
+ dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
+ ? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
+ : new HashMap<>();
+ } catch (RuntimeException e) {
+ MultiHopFlowCompiler.log.warn("Exception reading data node alias map, ignoring it.", e);
+ }
+
+ this.flowGraph = new BaseFlowGraph(dataNodeAliasMap);
+
Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = Optional.absent();
if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
&& StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index 83a9a58..9f5ef02 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -50,6 +50,15 @@ public class BaseFlowGraph implements FlowGraph {
private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>();
private Map<String, DataNode> dataNodeMap = new HashMap<>();
private Map<String, FlowEdge> flowEdgeMap = new HashMap<>();
+ private Map<String, String> dataNodeAliasMap;
+
+ public BaseFlowGraph() {
+ this(new HashMap<>());
+ }
+
+ public BaseFlowGraph(Map<String, String> dataNodeAliasMap) {
+ this.dataNodeAliasMap = dataNodeAliasMap;
+ }
/**
* Lookup a node by its identifier.
@@ -238,7 +247,7 @@ public class BaseFlowGraph implements FlowGraph {
.getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS));
PathFinder pathFinder =
- (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec);
+ (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec, dataNodeAliasMap);
return pathFinder.findPath();
} finally {
rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 4dc71f4..2e72618 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -26,6 +26,7 @@ import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -78,7 +79,11 @@ public abstract class AbstractPathFinder implements PathFinder {
protected FlowSpec flowSpec;
protected Config flowConfig;
- AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+ AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException {
+ this(flowGraph, flowSpec, new HashMap<>());
+ }
+
+ AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec, Map<String, String> dataNodeAliasMap)
throws ReflectiveOperationException {
this.flowGraph = flowGraph;
this.flowSpec = flowSpec;
@@ -86,9 +91,9 @@ public abstract class AbstractPathFinder implements PathFinder {
this.flowConfig = flowSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
//Get src/dest DataNodes from the flow config
- String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
+ String srcNodeId = getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
+ List<String> destNodeIds = getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
- List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
this.srcNode = this.flowGraph.getNode(srcNodeId);
Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
for (String destNodeId : destNodeIds) {
@@ -155,6 +160,16 @@ public abstract class AbstractPathFinder implements PathFinder {
this.destDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(destDatasetDescriptorConfig);
}
+ static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+ List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
+ return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
+ }
+
+ private static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+ String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
+ return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
+ }
+
public static Config getDefaultConfig(DataNode dataNode) {
Config defaultConfig = ConfigFactory.empty();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index fa46efd..5435884 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@@ -58,7 +59,11 @@ public class BFSPathFinder extends AbstractPathFinder {
*/
public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
throws ReflectiveOperationException {
- super(flowGraph, flowSpec);
+ this(flowGraph, flowSpec, new HashMap<>());
+ }
+
+ public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec, Map<String, String> dataNodeAliasMap) throws ReflectiveOperationException {
+ super(flowGraph, flowSpec, dataNodeAliasMap);
}
/**
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
new file mode 100644
index 0000000..c746372
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+public class AbstractPathFinderTest {
+
+ @Test
+ public void convertDataNodesTest() {
+ Config flowConfig = ConfigFactory.empty()
+ .withValue(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ConfigValueFactory.fromAnyRef("node1-alpha,node2"));
+ Map<String, String> dataNodeAliasMap = new HashMap<>();
+ dataNodeAliasMap.put("node1-alpha", "node1");
+ dataNodeAliasMap.put("node1-beta", "node1");
+ dataNodeAliasMap.put("node3-alpha", "node3");
+ dataNodeAliasMap.put("node1-beta", "node3");
+
+ List<String> dataNodes = AbstractPathFinder.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+ Assert.assertEquals(dataNodes.size(), 2);
+ Assert.assertTrue(dataNodes.contains("node1"));
+ Assert.assertTrue(dataNodes.contains("node2"));
+ }
+}