You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/22 00:00:59 UTC

[gobblin] branch master updated: Support multiple node types in shared flowgraph, fix logs (#3590)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new edd6149a5 Support multiple node types in shared flowgraph, fix logs (#3590)
edd6149a5 is described below

commit edd6149a5483484fdd417984a51a64d682b4cf8b
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Oct 21 17:00:55 2022 -0700

    Support multiple node types in shared flowgraph, fix logs (#3590)
---
 .../modules/flowgraph/BaseFlowGraphHelper.java     |  6 ++--
 .../modules/flowgraph/SharedFlowGraphHelper.java   | 39 ++++++++++++++--------
 .../service/monitoring/FsFlowGraphMonitorTest.java |  2 +-
 3 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
index 3e705a30b..3d33b2f1c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java
@@ -82,8 +82,8 @@ public class BaseFlowGraphHelper {
   protected MetricContext metricContext;
   final String flowGraphFolderName;
   final PullFileLoader pullFileLoader;
-  final Set<String> javaPropsExtensions;
-  final Set<String> hoconFileExtensions;
+  protected final Set<String> javaPropsExtensions;
+  protected final Set<String> hoconFileExtensions;
   protected final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;
 
   public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog,
@@ -133,7 +133,7 @@ public class BaseFlowGraphHelper {
         if (this.flowGraphUpdateFailedMeter.isPresent()) {
           this.flowGraphUpdateFailedMeter.get().mark();
         }
-        log.warn("Could not add DataNode defined in {} due to exception {}", path, e);
+        log.warn(String.format("Could not add DataNode defined in %s due to exception: ", path), e);
       }
     }
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
index ce0c98b3d..65043eb24 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.flowgraph;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FilenameUtils;
@@ -61,7 +63,6 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
 
   protected String sharedNodeFolder;
-  private static String NODE_FILE_SUFFIX = ".properties";
   private static String SHARED_NODE_FOLDER_NAME = "nodes";
   private static int NODE_FOLDER_DEPTH = 2;
 
@@ -83,18 +84,30 @@ public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
     try {
       // Load node from shared folder first if it exists
       Config sharedNodeConfig = ConfigFactory.empty();
-      String nodePropertyFile = path.getFileName().toString() + NODE_FILE_SUFFIX;
-      File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
-      if (sharedNodeFile.exists()) {
-        sharedNodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
-      }
-      File nodeFilePath = new File(path.toString(), nodePropertyFile);
-      Config nodeConfig = sharedNodeConfig;
-      if (nodeFilePath.exists()) {
-        nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(sharedNodeConfig);
+      List<String> nodeFileSuffixes = new ArrayList<>(this.javaPropsExtensions);
+      nodeFileSuffixes.addAll(this.hoconFileExtensions);
+      // Since there can be multiple file types supported, check if there is a shared node definition that matches any of the file types
+      // If multiple definitions in the same folder, only load one of them
+      // Assume that configuration overrides in subfolders use the same file type for the same node
+      Config nodeConfig = ConfigFactory.empty();
+      for (String fileSuffix: nodeFileSuffixes) {
+        String nodePropertyFile = path.getFileName().toString() + "." + fileSuffix;
+        File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
+        if (sharedNodeFile.exists()) {
+          nodeConfig = loadNodeFileWithOverrides(new Path(sharedNodeFile.getPath()));
+        }
+        File nodeFilePath = new File(path.toString(), nodePropertyFile);
+        if (nodeFilePath.exists()) {
+          nodeConfig = loadNodeFileWithOverrides(new Path(nodeFilePath.getPath())).withFallback(nodeConfig);
+        }
+        if (!nodeConfig.isEmpty()) {
+          break;
+        }
       }
       if (nodeConfig.isEmpty()) {
-        throw new IOException(String.format("Cannot find expected property file %s in %s or %s", nodePropertyFile, sharedNodeFolder, path));
+        throw new IOException(
+            String.format("Cannot find expected node file starting with %s in %s or %s", path.getFileName().toString(), sharedNodeFolder,
+                path));
       }
       Class dataNodeClass = Class.forName(ConfigUtils.getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
           FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
@@ -104,11 +117,11 @@ public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
       } else {
         log.info("Added Datanode {} to FlowGraph", dataNode.getId());
       }
-    } catch (Exception e) {
+    } catch (IOException | ReflectiveOperationException e) {
       if (this.flowGraphUpdateFailedMeter.isPresent()) {
         this.flowGraphUpdateFailedMeter.get().mark();
       }
-      log.warn("Could not add DataNode {} defined in {} due to exception {}", path, e);
+      log.warn(String.format("Could not add DataNode defined in %s due to exception: ", path), e);
     }
   }
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 1a4a36eff..7e1a17414 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -205,7 +205,7 @@ public class FsFlowGraphMonitorTest {
     // Set up node 3
     File node3Folder = new File(this.flowGraphDir, "node3");
     node3Folder.mkdirs();
-    File node3File = new File(this.sharedNodeFolder, "node3.properties");
+    File node3File = new File(this.sharedNodeFolder, "node3.conf");
     String file3Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam3=value3\n";
 
     // Have different default values for node 1