You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/01/06 23:48:00 UTC
[jira] [Work logged] (GOBBLIN-1759) More robust logging when attempting to resolve a flow config
[ https://issues.apache.org/jira/browse/GOBBLIN-1759?focusedWorklogId=837645&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837645 ]
ASF GitHub Bot logged work on GOBBLIN-1759:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jan/23 23:47
Start Date: 06/Jan/23 23:47
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3614:
URL: https://github.com/apache/gobblin/pull/3614#discussion_r1063865584
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java:
##########
@@ -128,13 +134,19 @@ private void validatePartitionConfig(String partitionType, String partitionPatte
}
}
- public boolean contains(FSDatasetPartitionConfig other) {
- if (other == null) {
- return false;
+ public ArrayList<String> contains(FSDatasetPartitionConfig userFlowConfig) {
+ String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ ArrayList<String> errors = new ArrayList<>();
+
+ if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getPartitionType())
+ && !this.getPartitionType().equalsIgnoreCase(userFlowConfig.getPartitionType())) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY , userFlowConfig.getPartitionType(), this.getPartitionType()));
+ }
+
+ if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
+ && !this.getPartitionPattern().equalsIgnoreCase(userFlowConfig.getPartitionPattern())) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY , userFlowConfig.getPartitionPattern(), this.getPartitionPattern()));
Review Comment:
This can leverage the other function I mentioned as well
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java:
##########
@@ -113,41 +120,57 @@ protected boolean isPathContaining(DatasetDescriptor other) {
* accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
* is a glob pattern, we return false.
*
- * @param otherPath a glob pattern that describes a set of paths.
+ * @param userFlowConfigPath a glob pattern that describes a set of paths.
* @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
*/
- private boolean isPathContaining(String otherPath) {
- if (otherPath == null) {
- return false;
+ private ArrayList<String> isPathContaining(String userFlowConfigPath, Boolean inputDataset) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ ArrayList<String> errors = new ArrayList<>();
+ if (userFlowConfigPath == null) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, this.getPath()));
+ return errors;
}
if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
- return true;
+ return errors;
}
- if (PathUtils.isGlob(new Path(otherPath))) {
- return false;
+ if (PathUtils.isGlob(new Path(userFlowConfigPath))) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath));
+ return errors;
}
GlobPattern globPattern = new GlobPattern(this.getPath());
- return globPattern.matches(otherPath);
+
+ if (!globPattern.matches(userFlowConfigPath)) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, userFlowConfigPath, this.getPath()));
+ }
+ return errors;
}
/**
* {@inheritDoc}
*/
@Override
- public boolean contains(DatasetDescriptor o) {
- if (!super.contains(o)) {
- return false;
+ public ArrayList<String> contains(DatasetDescriptor userFlowConfigDatasetDescriptor) {
+ String datasetDescriptorPrefix = userFlowConfigDatasetDescriptor.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ ArrayList<String> errors = new ArrayList<>();
+ if (super.contains(userFlowConfigDatasetDescriptor).size() != 0) {
+ return super.contains(userFlowConfigDatasetDescriptor);
}
- FSDatasetDescriptor other = (FSDatasetDescriptor) o;
+ FSDatasetDescriptor userFlowConfig = (FSDatasetDescriptor) userFlowConfigDatasetDescriptor;
- if ((this.isCompacted() != other.isCompacted()) ||
- (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
- return false;
+ if ((this.isCompacted() != userFlowConfig.isCompacted()) ||
+ (this.isCompactedAndDeduped() != userFlowConfig.isCompactedAndDeduped())) {
+ if (this.isCompacted() != userFlowConfig.isCompacted()) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, userFlowConfig.isCompacted(), this.isCompacted()));
+ }
+ else {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, userFlowConfig.isCompactedAndDeduped(), this.isCompactedAndDeduped()));
+ }
Review Comment:
You should probably separate the if statements here to clean it up since they have different side effects
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
* is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
* not resolvable.
* @param userConfig User supplied Config
+ * @return
*/
@Override
- public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
- throws SpecNotFoundException, JobTemplate.TemplateException {
+ public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+ String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+ String jobTemplatePattern = "/jobs/";
+
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
+ HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
for (JobTemplate template: this.jobTemplates) {
- this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ ArrayList<String> errors = new ArrayList<>();
+ try {
+ this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ }
+ catch (ConfigException e) {
+ errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+ }
+ catch (JobTemplate.TemplateException e) {
+ System.out.println("TEMPLATE " + e.toString());
Review Comment:
Use proper logging here instead of System.out.println
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java:
##########
@@ -19,38 +19,46 @@
import com.typesafe.config.Config;
import java.io.IOException;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorStrings;
import org.apache.gobblin.util.ConfigUtils;
/**
* An implementation of {@link FSVolumeDatasetDescriptor} with fs.uri specified.
*/
@Alpha
-@ToString(callSuper = true, exclude = {"rawConfig"})
-@EqualsAndHashCode(callSuper = true, exclude = {"rawConfig"})
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
@Getter
private final String fsUri;
public FSVolumeDatasetDescriptor(Config config) throws IOException {
super(config);
this.fsUri = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FS_URI_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
@Override
- public boolean contains(DatasetDescriptor o) {
- if (!super.contains(o)) {
- return false;
+ public ArrayList<String> contains(DatasetDescriptor userFlowConfig) {
+ String datasetDescriptorPrefix = userFlowConfig.getIsInputDataset() ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ ArrayList<String> errors = new ArrayList<>();
+ if (super.contains(userFlowConfig).size() != 0) {
+ return super.contains(userFlowConfig);
}
- FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) o;
+ FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) userFlowConfig;
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
- .equals(other.getFsUri());
+ if (!(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
+ .equals(other.getFsUri()))) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FS_URI_KEY, ((FSVolumeDatasetDescriptor) userFlowConfig).getFsUri(), this.getFsUri()));
Review Comment:
Nit: You should be using `other.getFsUri()` instead of `FSVolumeDatasetDescriptor) userFlowConfig).getFsUri()`. Also, this can also use the function I mentioned above
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java:
##########
@@ -165,19 +164,42 @@ private Config getResolvedFlowConfig(Config userConfig) {
* is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
* not resolvable.
* @param userConfig User supplied Config
+ * @return
*/
@Override
- public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
- throws SpecNotFoundException, JobTemplate.TemplateException {
+ public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+ String variableSubstitutionErrorPattern = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+ String jobTemplatePattern = "/jobs/";
+
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
+ HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
for (JobTemplate template: this.jobTemplates) {
- this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ ArrayList<String> errors = new ArrayList<>();
+ try {
+ this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ }
+ catch (ConfigException e) {
+ errors.add(e.toString().split(variableSubstitutionErrorPattern)[1]);
+ }
+ catch (JobTemplate.TemplateException e) {
+ System.out.println("TEMPLATE " + e.toString());
+ }
+ catch (SpecNotFoundException e) {
+ System.out.println("SPEC NOT FOUND");
Review Comment:
Use log4j > system.out.println. Also probably format the message properly. You can also do a multi catch of the error
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java:
##########
@@ -67,24 +71,38 @@ public FormatConfig(Config config) throws IOException {
.empty()));
this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
withFallback(DEFAULT_FALLBACK);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
- public boolean contains(FormatConfig other) {
- return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
- && containsEncryptionConfig(other.getEncryptionConfig());
+ public ArrayList<String> contains(FormatConfig userFlowConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ errors.addAll(containsFormat(userFlowConfig.getFormat(), userFlowConfig.getIsInputDataset()));
+ errors.addAll(containsCodec(userFlowConfig.getCodecType(), userFlowConfig.getIsInputDataset()));
+ errors.addAll(containsEncryptionConfig(userFlowConfig.getEncryptionConfig()));
+ return errors;
}
- private boolean containsFormat(String otherFormat) {
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
- || (this.getFormat().equalsIgnoreCase(otherFormat));
+ private ArrayList<String> containsFormat(String userFlowConfigFormat, Boolean inputDataset) {
+ ArrayList<String> errors = new ArrayList<>();
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
+ && (!this.getFormat().equalsIgnoreCase(userFlowConfigFormat))) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.FORMAT_KEY, userFlowConfigFormat, this.getFormat()));
+ }
+ return errors;
}
- private boolean containsCodec(String otherCodecType) {
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
- || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+ private ArrayList<String> containsCodec(String userFlowConfigCodecType, Boolean inputDataset) {
+ ArrayList<String> errors = new ArrayList<>();
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (!DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
+ && (!this.getCodecType().equalsIgnoreCase(userFlowConfigCodecType))) {
+ errors.add(String.format(DatasetDescriptorErrorStrings.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.CODEC_KEY, userFlowConfigCodecType, this.getCodecType()));
Review Comment:
same here
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java:
##########
@@ -641,7 +641,8 @@ public void testUnresolvedFlow() throws Exception {
Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
Assert.assertNull(dag);
- Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 1);
+ // 6 expected errors as now returns a list of errors (including flowTemplateErrors and variable substitution errors) instead of a single error
+ Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 6);
Review Comment:
Can you test the error in more detail?
Issue Time Tracking
-------------------
Worklog Id: (was: 837645)
Remaining Estimate: 0h
Time Spent: 10m
> More robust logging when attempting to resolve a flow config
> ------------------------------------------------------------
>
> Key: GOBBLIN-1759
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1759
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Andy Jiang
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)