You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/01/06 23:48:00 UTC

[jira] [Work logged] (GOBBLIN-1759) More robust logging when attempting to resolve a flow config

     [ https://issues.apache.org/jira/browse/GOBBLIN-1759?focusedWorklogId=837645&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837645 ]

ASF GitHub Bot logged work on GOBBLIN-1759:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jan/23 23:47
            Start Date: 06/Jan/23 23:47
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3614:
URL: https://github.com/apache/gobblin/pull/3614#discussion_r1063865584


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java:
##########
@@ -128,13 +134,19 @@ private void validatePartitionConfig(String partitionType, String partitionPatte
     }
   }
 
-  public boolean contains(FSDatasetPartitionConfig other) {
-    if (other == null) {
-      return false;
+  public ArrayList<String> contains(FSDatasetPartitionConfig userFlowConfig) {
+    String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getPartitionType())
+        && !this.getPartitionType().equalsIgnoreCase(userFlowConfig.getPartitionType())) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY , userFlowConfig.getPartitionType(), this.getPartitionType()));
+    }
+
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
+        && !this.getPartitionPattern().equalsIgnoreCase(userFlowConfig.getPartitionPattern())) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY , userFlowConfig.getPartitionPattern(), this.getPartitionPattern()));

Review Comment:
   This can leverage the other function I mentioned as well



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java:
##########
@@ -113,41 +120,57 @@ protected boolean isPathContaining(DatasetDescriptor other) {
    * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
    * is a glob pattern, we return false.
    *
-   * @param otherPath a glob pattern that describes a set of paths.
+   * @param userFlowConfigPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
    */
-  private boolean isPathContaining(String otherPath) {
-    if (otherPath == null) {
-      return false;
+  private ArrayList<String> isPathContaining(String userFlowConfigPath, Boolean inputDataset) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (userFlowConfigPath == null) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, this.getPath()));
+      return errors;
     }
     if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
-      return true;
+      return errors;
     }
 
-    if (PathUtils.isGlob(new Path(otherPath))) {
-      return false;
+    if (PathUtils.isGlob(new Path(userFlowConfigPath))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath));
+      return errors;
     }
 
     GlobPattern globPattern = new GlobPattern(this.getPath());
-    return globPattern.matches(otherPath);
+
+    if (!globPattern.matches(userFlowConfigPath)) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath, this.getPath()));
+    }
+    return errors;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor userFlowConfigDatasetDescriptor) {
+    String datasetDescriptorPrefix = userFlowConfigDatasetDescriptor.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(userFlowConfigDatasetDescriptor).size() != 0) {
+      return super.contains(userFlowConfigDatasetDescriptor);
     }
 
-    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+    FSDatasetDescriptor userFlowConfig = (FSDatasetDescriptor) userFlowConfigDatasetDescriptor;
 
-    if ((this.isCompacted() != other.isCompacted()) ||
-        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
-      return false;
+    if ((this.isCompacted() != userFlowConfig.isCompacted()) ||
+        (this.isCompactedAndDeduped() != userFlowConfig.isCompactedAndDeduped())) {
+      if (this.isCompacted() != userFlowConfig.isCompacted()) {
+        errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, userFlowConfig.isCompacted(), this.isCompacted()));
+      }
+      else {
+        errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, userFlowConfig.isCompactedAndDeduped(), this.isCompactedAndDeduped()));
+      }

Review Comment:
   You should probably separate the if statements here to clean it up since they have different side effects



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
    * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
    * not resolvable.
    * @param userConfig User supplied Config
+   * @return
    */
   @Override
-  public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+    String jobTemplatePattern = "/jobs/";
+
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
+    HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
     for (JobTemplate template: this.jobTemplates) {
-      this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      ArrayList<String> errors = new ArrayList<>();
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      }
+      catch (ConfigException e) {
+        errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+      }
+      catch (JobTemplate.TemplateException e) {
+        System.out.println("TEMPLATE " + e.toString());

Review Comment:
   Use proper logging here instead of System.out.println



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java:
##########
@@ -19,38 +19,46 @@
 
 import com.typesafe.config.Config;
 import java.io.IOException;
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorStrings;
 import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * An implementation of {@link FSVolumeDatasetDescriptor} with fs.uri specified.
  */
 @Alpha
-@ToString(callSuper = true, exclude = {"rawConfig"})
-@EqualsAndHashCode(callSuper = true, exclude = {"rawConfig"})
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
 public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
   @Getter
   private final String fsUri;
 
   public FSVolumeDatasetDescriptor(Config config) throws IOException {
     super(config);
     this.fsUri = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FS_URI_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor userFlowConfig) {
+    String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(userFlowConfig).size() != 0) {
+      return super.contains(userFlowConfig);
     }
 
-    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) o;
+    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) userFlowConfig;
 
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
-        .equals(other.getFsUri());
+    if (!(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
+        .equals(other.getFsUri()))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FS_URI_KEY, ((FSVolumeDatasetDescriptor) userFlowConfig).getFsUri(), this.getFsUri()));

Review Comment:
   Nit: You should be using `other.getFsUri()` instead of `FSVolumeDatasetDescriptor) userFlowConfig).getFsUri()`. Also, this can also use the function I mentioned above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
    * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
    * not resolvable.
    * @param userConfig User supplied Config
+   * @return
    */
   @Override
-  public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+    String jobTemplatePattern = "/jobs/";
+
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
+    HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
     for (JobTemplate template: this.jobTemplates) {
-      this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      ArrayList<String> errors = new ArrayList<>();
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      }
+      catch (ConfigException e) {
+        errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+      }
+      catch (JobTemplate.TemplateException e) {
+        System.out.println("TEMPLATE " + e.toString());
+      }
+      catch (SpecNotFoundException e) {
+        System.out.println("SPEC NOT FOUND");

Review Comment:
   Use log4j > system.out.println. Also probably format the message properly. You can also do a multi catch of the error



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java:
##########
@@ -67,24 +71,38 @@ public FormatConfig(Config config) throws IOException {
         .empty()));
     this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
         withFallback(DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
-  public boolean contains(FormatConfig other) {
-    return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
-        && containsEncryptionConfig(other.getEncryptionConfig());
+  public ArrayList<String> contains(FormatConfig userFlowConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    errors.addAll(containsFormat(userFlowConfig.getFormat(), userFlowConfig.getIsInputDataset()));
+    errors.addAll(containsCodec(userFlowConfig.getCodecType(), userFlowConfig.getIsInputDataset()));
+    errors.addAll(containsEncryptionConfig(userFlowConfig.getEncryptionConfig()));
+    return errors;
   }
 
-  private boolean containsFormat(String otherFormat) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
-        || (this.getFormat().equalsIgnoreCase(otherFormat));
+  private ArrayList<String> containsFormat(String userFlowConfigFormat, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+        && (!this.getFormat().equalsIgnoreCase(userFlowConfigFormat))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FORMAT_KEY, userFlowConfigFormat, this.getFormat()));
+    }
+    return errors;
   }
 
-  private boolean containsCodec(String otherCodecType) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
-        || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+  private ArrayList<String> containsCodec(String userFlowConfigCodecType, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+        && (!this.getCodecType().equalsIgnoreCase(userFlowConfigCodecType))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.CODEC_KEY, userFlowConfigCodecType, this.getCodecType()));

Review Comment:
   same here



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java:
##########
@@ -641,7 +641,8 @@ public void testUnresolvedFlow() throws Exception {
     Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
 
     Assert.assertNull(dag);
-    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 1);
+    // 6 expected errors as now returns a list of errors (including flowTemplateErrors and variable substitution errors) instead of a single error
+    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 6);

Review Comment:
   Can you test the error in more detail?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 837645)
    Remaining Estimate: 0h
            Time Spent: 10m

> More robust logging when attempting to resolve a flow config
> ------------------------------------------------------------
>
>                 Key: GOBBLIN-1759
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1759
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-service
>            Reporter: Andy Jiang
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)