You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2023/01/06 23:47:41 UTC

[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3614: [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs

Will-Lo commented on code in PR #3614:
URL: https://github.com/apache/gobblin/pull/3614#discussion_r1063865584


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java:
##########
@@ -128,13 +134,19 @@ private void validatePartitionConfig(String partitionType, String partitionPatte
     }
   }
 
-  public boolean contains(FSDatasetPartitionConfig other) {
-    if (other == null) {
-      return false;
+  public ArrayList<String> contains(FSDatasetPartitionConfig userFlowConfig) {
+    String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getPartitionType())
+        && !this.getPartitionType().equalsIgnoreCase(userFlowConfig.getPartitionType())) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY , userFlowConfig.getPartitionType(), this.getPartitionType()));
+    }
+
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
+        && !this.getPartitionPattern().equalsIgnoreCase(userFlowConfig.getPartitionPattern())) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY , userFlowConfig.getPartitionPattern(), this.getPartitionPattern()));

Review Comment:
   This can leverage the other function I mentioned as well



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java:
##########
@@ -113,41 +120,57 @@ protected boolean isPathContaining(DatasetDescriptor other) {
    * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
    * is a glob pattern, we return false.
    *
-   * @param otherPath a glob pattern that describes a set of paths.
+   * @param userFlowConfigPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
    */
-  private boolean isPathContaining(String otherPath) {
-    if (otherPath == null) {
-      return false;
+  private ArrayList<String> isPathContaining(String userFlowConfigPath, Boolean inputDataset) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (userFlowConfigPath == null) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, this.getPath()));
+      return errors;
     }
     if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
-      return true;
+      return errors;
     }
 
-    if (PathUtils.isGlob(new Path(otherPath))) {
-      return false;
+    if (PathUtils.isGlob(new Path(userFlowConfigPath))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath));
+      return errors;
     }
 
     GlobPattern globPattern = new GlobPattern(this.getPath());
-    return globPattern.matches(otherPath);
+
+    if (!globPattern.matches(userFlowConfigPath)) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath, this.getPath()));
+    }
+    return errors;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor userFlowConfigDatasetDescriptor) {
+    String datasetDescriptorPrefix = userFlowConfigDatasetDescriptor.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(userFlowConfigDatasetDescriptor).size() != 0) {
+      return super.contains(userFlowConfigDatasetDescriptor);
     }
 
-    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+    FSDatasetDescriptor userFlowConfig = (FSDatasetDescriptor) userFlowConfigDatasetDescriptor;
 
-    if ((this.isCompacted() != other.isCompacted()) ||
-        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
-      return false;
+    if ((this.isCompacted() != userFlowConfig.isCompacted()) ||
+        (this.isCompactedAndDeduped() != userFlowConfig.isCompactedAndDeduped())) {
+      if (this.isCompacted() != userFlowConfig.isCompacted()) {
+        errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, userFlowConfig.isCompacted(), this.isCompacted()));
+      }
+      else {
+        errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, userFlowConfig.isCompactedAndDeduped(), this.isCompactedAndDeduped()));
+      }

Review Comment:
   You should probably separate the if statements here to clean it up since they have different side effects



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
    * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
    * not resolvable.
    * @param userConfig User supplied Config
+   * @return
    */
   @Override
-  public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+    String jobTemplatePattern = "/jobs/";
+
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
+    HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
     for (JobTemplate template: this.jobTemplates) {
-      this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      ArrayList<String> errors = new ArrayList<>();
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      }
+      catch (ConfigException e) {
+        errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+      }
+      catch (JobTemplate.TemplateException e) {
+        System.out.println("TEMPLATE " + e.toString());

Review Comment:
   Use proper logging here instead of System.out.println



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java:
##########
@@ -19,38 +19,46 @@
 
 import com.typesafe.config.Config;
 import java.io.IOException;
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorStrings;
 import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * An implementation of {@link FSVolumeDatasetDescriptor} with fs.uri specified.
  */
 @Alpha
-@ToString(callSuper = true, exclude = {"rawConfig"})
-@EqualsAndHashCode(callSuper = true, exclude = {"rawConfig"})
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
 public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
   @Getter
   private final String fsUri;
 
   public FSVolumeDatasetDescriptor(Config config) throws IOException {
     super(config);
     this.fsUri = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FS_URI_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor userFlowConfig) {
+    String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(userFlowConfig).size() != 0) {
+      return super.contains(userFlowConfig);
     }
 
-    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) o;
+    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) userFlowConfig;
 
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
-        .equals(other.getFsUri());
+    if (!(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
+        .equals(other.getFsUri()))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FS_URI_KEY, ((FSVolumeDatasetDescriptor) userFlowConfig).getFsUri(), this.getFsUri()));

Review Comment:
   Nit: You should be using `other.getFsUri()` instead of `FSVolumeDatasetDescriptor) userFlowConfig).getFsUri()`. Also, this can also use the function I mentioned above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
    * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
    * not resolvable.
    * @param userConfig User supplied Config
+   * @return
    */
   @Override
-  public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+    String jobTemplatePattern = "/jobs/";
+
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
+    HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
     for (JobTemplate template: this.jobTemplates) {
-      this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      ArrayList<String> errors = new ArrayList<>();
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      }
+      catch (ConfigException e) {
+        errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+      }
+      catch (JobTemplate.TemplateException e) {
+        System.out.println("TEMPLATE " + e.toString());
+      }
+      catch (SpecNotFoundException e) {
+        System.out.println("SPEC NOT FOUND");

Review Comment:
   Use log4j > system.out.println. Also probably format the message properly. You can also do a multi catch of the error



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java:
##########
@@ -67,24 +71,38 @@ public FormatConfig(Config config) throws IOException {
         .empty()));
     this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
         withFallback(DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
-  public boolean contains(FormatConfig other) {
-    return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
-        && containsEncryptionConfig(other.getEncryptionConfig());
+  public ArrayList<String> contains(FormatConfig userFlowConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    errors.addAll(containsFormat(userFlowConfig.getFormat(), userFlowConfig.getIsInputDataset()));
+    errors.addAll(containsCodec(userFlowConfig.getCodecType(), userFlowConfig.getIsInputDataset()));
+    errors.addAll(containsEncryptionConfig(userFlowConfig.getEncryptionConfig()));
+    return errors;
   }
 
-  private boolean containsFormat(String otherFormat) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
-        || (this.getFormat().equalsIgnoreCase(otherFormat));
+  private ArrayList<String> containsFormat(String userFlowConfigFormat, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+        && (!this.getFormat().equalsIgnoreCase(userFlowConfigFormat))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FORMAT_KEY, userFlowConfigFormat, this.getFormat()));
+    }
+    return errors;
   }
 
-  private boolean containsCodec(String otherCodecType) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
-        || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+  private ArrayList<String> containsCodec(String userFlowConfigCodecType, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+        && (!this.getCodecType().equalsIgnoreCase(userFlowConfigCodecType))) {
+      errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.CODEC_KEY, userFlowConfigCodecType, this.getCodecType()));

Review Comment:
   same here



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java:
##########
@@ -641,7 +641,8 @@ public void testUnresolvedFlow() throws Exception {
     Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
 
     Assert.assertNull(dag);
-    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 1);
+    // 6 expected errors as now returns a list of errors (including flowTemplateErrors and variable substitution errors) instead of a single error
+    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 6);

Review Comment:
   Can you test the error in more detail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org