You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/19 22:02:18 UTC
[gobblin] branch master updated: use data node aliases to figure out data node names before using DMAS (#3493)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6c2f8d43f use data node aliases to figure out data node names before using DMAS (#3493)
6c2f8d43f is described below
commit 6c2f8d43f514723f036be2b63b55fcb8394aa4d6
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Apr 19 15:02:11 2022 -0700
use data node aliases to figure out data node names before using DMAS (#3493)
---
.../service/modules/flow/MultiHopFlowCompiler.java | 13 +++++++------
.../flowgraph/pathfinder/AbstractPathFinder.java | 15 +++------------
.../gobblin/service/modules/restli/FlowConfigUtils.java | 17 +++++++++++++++++
.../flowgraph/pathfinder/AbstractPathFinderTest.java | 3 ++-
4 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 5528c2761..d76b9123a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -66,6 +66,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.util.ClassAliasResolver;
@@ -92,6 +93,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
private DataMovementAuthorizer dataMovementAuthorizer;
+ private Map<String, String> dataNodeAliasMap = new HashMap<>();
+
// a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
@@ -109,10 +112,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
- Map<String, String> dataNodeAliasMap = new HashMap<>();
-
try {
- dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
+ this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
: new HashMap<>();
} catch (RuntimeException e) {
@@ -203,15 +204,15 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
long startTime = System.nanoTime();
FlowSpec flowSpec = (FlowSpec) spec;
- String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
- String destination = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "");
+ String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
+ String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
DataNode sourceNode = this.flowGraph.getNode(source);
if (sourceNode == null) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
return null;
}
- List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
+ List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
List<DataNode> destNodes = destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
if (destNodes.contains(null)) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 2e7261828..0bd19b538 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -91,8 +92,8 @@ public abstract class AbstractPathFinder implements PathFinder {
this.flowConfig = flowSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
//Get src/dest DataNodes from the flow config
- String srcNodeId = getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
- List<String> destNodeIds = getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+ String srcNodeId = FlowConfigUtils.getDataNode(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, dataNodeAliasMap);
+ List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
this.srcNode = this.flowGraph.getNode(srcNodeId);
Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId);
@@ -160,16 +161,6 @@ public abstract class AbstractPathFinder implements PathFinder {
this.destDatasetDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(destDatasetDescriptorConfig);
}
- static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
- List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
- return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
- }
-
- private static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
- String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
- return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
- }
-
public static Config getDefaultConfig(DataNode dataNode) {
Config defaultConfig = ConfigFactory.empty();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
index 3af1cf25e..a179ae475 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
@@ -18,14 +18,19 @@
package org.apache.gobblin.service.modules.restli;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
+import com.typesafe.config.Config;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -101,4 +106,16 @@ public class FlowConfigUtils {
return flowConfig;
}
+
+ public static List<String> getDataNodes(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+ List<String> dataNodes = ConfigUtils.getStringList(flowConfig, identifierKey);
+ return dataNodes.stream().map(dataNode -> dataNodeAliasMap.getOrDefault(dataNode, dataNode)).collect(Collectors.toList());
+ }
+
+ public static String getDataNode(Config flowConfig, String identifierKey, Map<String, String> dataNodeAliasMap) {
+ String dataNode = ConfigUtils.getString(flowConfig, identifierKey, "");
+ return dataNodeAliasMap.getOrDefault(dataNode, dataNode);
+ }
+
+
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
index c746372ba..964bdc103 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinderTest.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
public class AbstractPathFinderTest {
@@ -43,7 +44,7 @@ public class AbstractPathFinderTest {
dataNodeAliasMap.put("node3-alpha", "node3");
dataNodeAliasMap.put("node1-beta", "node3");
- List<String> dataNodes = AbstractPathFinder.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
+ List<String> dataNodes = FlowConfigUtils.getDataNodes(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, dataNodeAliasMap);
Assert.assertEquals(dataNodes.size(), 2);
Assert.assertTrue(dataNodes.contains("node1"));
Assert.assertTrue(dataNodes.contains("node2"));