You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/22 00:00:59 UTC
[gobblin] branch master updated: Support multiple node types in shared flowgraph, fix logs (#3590)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edd6149a5 Support multiple node types in shared flowgraph, fix logs (#3590)
edd6149a5 is described below
commit edd6149a5483484fdd417984a51a64d682b4cf8b
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Oct 21 17:00:55 2022 -0700
Support multiple node types in shared flowgraph, fix logs (#3590)
---
.../modules/flowgraph/BaseFlowGraphHelper.java | 6 ++--
.../modules/flowgraph/SharedFlowGraphHelper.java | 39 ++++++++++++++--------
.../service/monitoring/FsFlowGraphMonitorTest.java | 2 +-
3 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 3e705a30b..3d33b2f1c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -82,8 +82,8 @@ public class BaseFlowGraphHelper {
protected MetricContext metricContext;
final String flowGraphFolderName;
final PullFileLoader pullFileLoader;
- final Set<String> javaPropsExtensions;
- final Set<String> hoconFileExtensions;
+ protected final Set<String> javaPropsExtensions;
+ protected final Set<String> hoconFileExtensions;
protected final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
@@ -133,7 +133,7 @@ public class BaseFlowGraphHelper {
if (this.flowGraphUpdateFailedMeter.isPresent()) {
this.flowGraphUpdateFailedMeter.get().mark();
}
- log.warn("Could not add DataNode defined in {} due to exception {}", path, e);
+ log.warn(String.format("Could not add DataNode defined in %s due to exception: ", path), e);
}
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
index ce0c98b3d..65043eb24 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.flowgraph;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
@@ -61,7 +63,6 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
protected String sharedNodeFolder;
- private static String NODE_FILE_SUFFIX = ".properties";
private static String SHARED_NODE_FOLDER_NAME = "nodes";
private static int NODE_FOLDER_DEPTH = 2;
@@ -83,18 +84,30 @@ public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
try {
// Load node from shared folder first if it exists
Config sharedNodeConfig = ConfigFactory.empty();
- String nodePropertyFile = path.getFileName().toString() + NODE_FILE_SUFFIX;
- File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
- if (sharedNodeFile.exists()) {
- sharedNodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
- }
- File nodeFilePath = new File(path.toString(), nodePropertyFile);
- Config nodeConfig = sharedNodeConfig;
- if (nodeFilePath.exists()) {
- nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(sharedNodeConfig);
+ List<String> nodeFileSuffixes = new ArrayList<>(this.javaPropsExtensions);
+ nodeFileSuffixes.addAll(this.hoconFileExtensions);
+ // Since there can be multiple file types supported, check if there is a shared node definition that matches any of the file types
+ // If multiple definitions in the same folder, only load one of them
+ // Assume that configuration overrides in subfolders use the same file type for the same node
+ Config nodeConfig = ConfigFactory.empty();
+ for (String fileSuffix: nodeFileSuffixes) {
+ String nodePropertyFile = path.getFileName().toString() + "." + fileSuffix;
+ File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
+ if (sharedNodeFile.exists()) {
+ nodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
+ }
+ File nodeFilePath = new File(path.toString(), nodePropertyFile);
+ if (nodeFilePath.exists()) {
+ nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(nodeConfig);
+ }
+ if (!nodeConfig.isEmpty()) {
+ break;
+ }
}
if (nodeConfig.isEmpty()) {
- throw new IOException(String.format("Cannot find expected property file %s in %s or %s", nodePropertyFile, sharedNodeFolder, path));
+ throw new IOException(
+ String.format("Cannot find expected node file starting with %s in %s or %s", path.getFileName().toString(), sharedNodeFolder,
+ path));
}
Class dataNodeClass = Class.forName(ConfigUtils.getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
@@ -104,11 +117,11 @@ public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
} else {
log.info("Added Datanode {} to FlowGraph", dataNode.getId());
}
- } catch (Exception e) {
+ } catch (IOException | ReflectiveOperationException e) {
if (this.flowGraphUpdateFailedMeter.isPresent()) {
this.flowGraphUpdateFailedMeter.get().mark();
}
- log.warn("Could not add DataNode {} defined in {} due to exception {}", path, e);
+ log.warn(String.format("Could not add DataNode defined in %s due to exception: ", path), e);
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 1a4a36eff..7e1a17414 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -205,7 +205,7 @@ public class FsFlowGraphMonitorTest {
// Set up node 3
File node3Folder = new File(this.flowGraphDir, "node3");
node3Folder.mkdirs();
- File node3File = new File(this.sharedNodeFolder, "node3.properties");
+ File node3File = new File(this.sharedNodeFolder, "node3.conf");
String file3Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam3=value3\n";
// Have different default values for node 1