You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/01/19 20:44:57 UTC
[gobblin] branch master updated: [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 34c20ff70 [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)
34c20ff70 is described below
commit 34c20ff70879ff1a8baf997932199abef82b3571
Author: Andy Jiang <an...@outlook.com>
AuthorDate: Thu Jan 19 12:44:52 2023 -0800
[GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)
* Add in changes to fetch flow config errors
* Add files
* Fix FSDatasetPartitionCOnfig not taking in the isInputDataset configuration
Change to take in the input for fsdatasetpartitionconfig
* Fix most of error message
* Remove commented out print statements
* Working update priority to +1 for flows that are multihop with first edge being a self edge
* Do error handling for the new FSVolumeDatasetDescriptor
* Fix tests related to contains
* Removed null dereferences
* Removed comments
* Remove shadowing of variables
* Fix test for new flow compilation error reporting
* Fix naming for right side contains
* Use string format template
* Add description for param
* Refactor out comparison of ANY to util function
* Test error in more detail
* Remove unused import
* Further refactor
* Remove unnecessary commenting out
* remove unused import
* Additional refactoring
* Refactor string split for DBs'
* Fix issue with string format for glob pattern inputs
* Address comments
* Fix FSDatasetDescriptor userFlowConfig naming
* Refactor database comparisons
* Add javadocs for util functions
* Remove unused slf4j
* Change internal terminology to single-step vs multi-step data movement
---
.../service/FlowConfigV2ResourceLocalHandler.java | 39 +++++-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 13 ++
.../modules/dataset/BaseDatasetDescriptor.java | 38 +++---
.../service/modules/dataset/DatasetDescriptor.java | 12 +-
.../modules/dataset/DatasetDescriptorUtils.java | 3 +-
.../service/modules/dataset/EncryptionConfig.java | 38 +++---
.../modules/dataset/FSDatasetDescriptor.java | 69 ++++++----
.../modules/dataset/FSDatasetPartitionConfig.java | 29 +++--
.../modules/dataset/FSVolumeDatasetDescriptor.java | 21 +--
.../service/modules/dataset/FormatConfig.java | 36 ++++--
.../modules/dataset/HiveDatasetDescriptor.java | 36 +++---
.../modules/dataset/HttpDatasetDescriptor.java | 15 ++-
.../modules/dataset/IcebergDatasetDescriptor.java | 25 ++--
.../modules/dataset/SqlDatasetDescriptor.java | 30 +++--
.../flowgraph/DatasetDescriptorConfigKeys.java | 1 +
.../flowgraph/DatasetDescriptorErrorUtils.java | 144 +++++++++++++++++++++
.../flowgraph/pathfinder/AbstractPathFinder.java | 40 ++++--
.../flowgraph/pathfinder/BFSPathFinder.java | 48 ++++---
.../service/modules/template/FlowTemplate.java | 6 +-
.../modules/template/StaticFlowTemplate.java | 31 ++++-
.../modules/dataset/FSDatasetDescriptorTest.java | 45 ++++---
.../modules/dataset/HttpDatasetDescriptorTest.java | 4 +-
.../dataset/IcebergDatasetDescriptorTest.java | 4 +-
.../modules/dataset/SqlDatasetDescriptorTest.java | 6 +-
.../modules/flow/MultiHopFlowCompilerTest.java | 4 +-
25 files changed, 511 insertions(+), 226 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 6370192ca..b20b83346 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,8 +16,13 @@
*/
package org.apache.gobblin.service;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.Hashtable;
import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
@@ -61,6 +66,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
if (flowConfig.hasExplain()) {
createLog += " explain " + flowConfig.isExplain();
}
+
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
FlowStatusId flowStatusId =
@@ -111,17 +117,42 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
private String getErrorMessage(FlowSpec flowSpec) {
StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
+ Hashtable<String, ArrayList<String>> allErrors = new Hashtable<>();
+
if (!flowSpec.getCompilationErrors().isEmpty()) {
message.append(" Compilation errors encountered (Sorted by relevance): ");
FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
- int errorId = 0;
+ int errorIdSingleHop = 1;
+ int errorIdMultiHop = 1;
+
+ ArrayList<String> singleHopErrors = new ArrayList<>();
+ ArrayList<String> multiHopErrors = new ArrayList<>();
+
for (FlowSpec.CompilationError error: errors) {
- message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
- errorId++;
+ if (error.errorPriority == 0) {
+ singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+ errorIdSingleHop++;
+ } else {
+ multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+ errorIdMultiHop++;
+ }
}
+
+ allErrors.put("singleHopErrors", singleHopErrors);
+ allErrors.put("multiHopErrors", multiHopErrors);
+ }
+
+ allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ return mapper.writeValueAsString(allErrors);
+ } catch (JsonProcessingException e) {
+ log.error("Flow Spec {} errored on Json processing", flowSpec.toString(), e);
+ e.printStackTrace();
}
- return message.toString();
+ return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
}
/**
* Note: this method is only implemented for testing, normally partial update would be called in
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index edeea4744..1e007c7c6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -125,6 +125,10 @@ public class FlowSpec implements Configurable, Spec {
throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
}
}
+ public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
+ this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops));
+ }
+
public void addCompilationError(String src, String dst, String errorMessage) {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
}
@@ -134,6 +138,15 @@ public class FlowSpec implements Configurable, Spec {
public int errorPriority;
public String errorMessage;
+ // Increment the error priority by 1 to eliminate flows with a self edge from having the same priority as a single hop flow edge.
+ // E.g. Cluster1 -> Cluster2 is the desired edge. Cluster1 -> Cluster2 would have error priority of 0, Cluster1 -> Cluster1 -> Cluster2 would have error priority of 1
+ public CompilationError(Config config, String src, String dst, String errorMessage, int numberOfHops) {
+ this(config, src, dst, errorMessage);
+ if (numberOfHops > 1) {
+ errorPriority++;
+ }
+ }
+
public CompilationError(Config config, String src, String dst, String errorMessage) {
errorPriority = 0;
if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
index cd98c8b01..8c4878c5d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -23,15 +23,17 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
-@EqualsAndHashCode (exclude = {"description", "rawConfig"})
-@ToString (exclude = {"description", "rawConfig"})
+@EqualsAndHashCode (exclude = {"description", "rawConfig", "isInputDataset"})
+@ToString (exclude = {"description", "rawConfig", "isInputDataset"})
public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
@Getter
private final String platform;
@@ -43,6 +45,8 @@ public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
private final String description;
@Getter
private final Config rawConfig;
+ @Getter
+ protected Boolean isInputDataset;
private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -57,37 +61,33 @@ public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
this.isRetentionApplied = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
this.rawConfig = config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
/**
* {@inheritDoc}
*/
- protected abstract boolean isPathContaining(DatasetDescriptor other);
+ protected abstract ArrayList<String> isPathContaining(DatasetDescriptor other);
/**
* @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
* datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
* {@link DatasetDescriptor}. This operation is non-commutative.
- * @param other
+ * @param inputDatasetDescriptorConfig This is the flow configuration that is sent in from user side and is compared against the flowgraph edges.
*/
@Override
- public boolean contains(DatasetDescriptor other) {
- if (this == other) {
- return true;
- }
-
- if (other == null || !getClass().equals(other.getClass())) {
- return false;
+ public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ if (this == inputDatasetDescriptorConfig) {
+ return errors;
}
- if (this.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
- return false;
- }
-
- if ((this.isRetentionApplied() != other.isRetentionApplied())) {
- return false;
- }
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.CLASS_KEY, this.getClass().toString(), inputDatasetDescriptorConfig.getClass().toString(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PLATFORM_KEY, this.getPlatform(), inputDatasetDescriptorConfig.getPlatform(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, String.valueOf(this.isRetentionApplied()), String.valueOf(inputDatasetDescriptorConfig.isRetentionApplied()), false);
- return isPathContaining(other) && getFormatConfig().contains(other.getFormatConfig());
+ errors.addAll(isPathContaining(inputDatasetDescriptorConfig));
+ errors.addAll(getFormatConfig().contains(inputDatasetDescriptorConfig.getFormatConfig()));
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 9c6abfe14..e51df520a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.dataset;
import com.typesafe.config.Config;
+import java.util.ArrayList;
import org.apache.gobblin.annotation.Alpha;
@@ -62,14 +63,19 @@ public interface DatasetDescriptor {
public String getDescription();
/**
- * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
- * datasets described by the other {@link DatasetDescriptor} is a subset of this {@link DatasetDescriptor}.
+ * @return an arraylist of errors when comparing whether the provided {@link DatasetDescriptor} contains the input {@link DatasetDescriptor}
+ * i.e. the datasets described by the other {@link DatasetDescriptor} is a subset of this {@link DatasetDescriptor}.
* This operation is non-commutative.
*/
- public boolean contains(DatasetDescriptor other);
+ public ArrayList<String> contains(DatasetDescriptor other);
/**
* @return the raw config.
*/
public Config getRawConfig();
+
+ /**
+ * @return true if is input dataset.
+ */
+ public Boolean getIsInputDataset();
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
index e21a31df1..c3e617994 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
@@ -19,10 +19,11 @@ package org.apache.gobblin.service.modules.dataset;
import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-
+@Slf4j
public class DatasetDescriptorUtils {
/**
* Given dataset descriptor config, construct a {@link DatasetDescriptor} object
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index f9e4e3598..a55d20b3a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -24,17 +24,19 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
@Slf4j
-@ToString(exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString(exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
public class EncryptionConfig {
@Getter
private final String encryptionAlgorithm;
@@ -48,6 +50,8 @@ public class EncryptionConfig {
private final String keystoreEncoding;
@Getter
private final Config rawConfig;
+ @Getter
+ protected Boolean isInputDataset;
public enum EncryptionLevel {
FILE("file"),
@@ -98,6 +102,7 @@ public class EncryptionConfig {
validate(this.encryptionLevel, this.encryptedFields);
}
this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
+ this.isInputDataset = ConfigUtils.getBoolean(encryptionConfig, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
private void validate(String encryptionLevel, String encryptedFields) throws IOException {
@@ -124,26 +129,15 @@ public class EncryptionConfig {
return;
}
- public boolean contains(EncryptionConfig other) {
- if (other == null) {
- return false;
- }
+ public ArrayList<String> contains(EncryptionConfig inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, this.getEncryptionAlgorithm(), inputDatasetDescriptorConfig.getEncryptionAlgorithm(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, this.getKeystoreType(), inputDatasetDescriptorConfig.getKeystoreType(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, this.getKeystoreEncoding(), inputDatasetDescriptorConfig.getKeystoreEncoding(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY, this.getEncryptionLevel(), inputDatasetDescriptorConfig.getEncryptionLevel(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, this.getEncryptedFields(), inputDatasetDescriptorConfig.getEncryptedFields(), false);
- String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
- String otherKeystoreType = other.getKeystoreType();
- String otherKeystoreEncoding = other.getKeystoreEncoding();
- String otherEncryptionLevel = other.getEncryptionLevel();
- String otherEncryptedFields = other.getEncryptedFields();
-
- return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionAlgorithm())
- || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
- && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreType())
- || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
- && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreEncoding())
- || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding))
- && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionLevel())
- || this.encryptionLevel.equalsIgnoreCase(otherEncryptionLevel))
- && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
- || this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 996fa9674..53eaa4f54 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
@@ -35,6 +36,7 @@ import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -73,38 +75,41 @@ public class FSDatasetDescriptor extends BaseDatasetDescriptor implements Datase
this.subPaths = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.SUBPATHS_KEY, null);
this.isCompacted = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
this.isCompactedAndDeduped = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
- this.partitionConfig = new FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX));
+ this.partitionConfig = new FSDatasetPartitionConfig(config);
this.rawConfig = config.withFallback(getPartitionConfig().getRawConfig()).withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
/**
* If other descriptor has subpaths, this method checks that each concatenation of path + subpath is matched by this
* path. Otherwise, it just checks the path.
*
- * @param other descriptor whose path/subpaths to check
+ * @param inputDatasetDescriptorConfig descriptor whose path/subpaths to check
* @return true if all subpaths are matched by this {@link DatasetDescriptor}'s path, or if subpaths is null and
* the other's path matches this path.
*/
@Override
- protected boolean isPathContaining(DatasetDescriptor other) {
- String otherPath = other.getPath();
- String otherSubPaths = ((FSDatasetDescriptor) other).getSubPaths();
+ protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ String otherPath = inputDatasetDescriptorConfig.getPath();
+ String otherSubPaths = ((FSDatasetDescriptor) inputDatasetDescriptorConfig).getSubPaths();
// This allows the special case where "other" is a glob, but is also an exact match with "this" path.
if (getPath().equals(otherPath)) {
- return true;
+ return errors;
}
if (otherSubPaths != null) {
List<String> subPaths = Splitter.on(",").splitToList(StringUtils.stripEnd(StringUtils.stripStart(otherSubPaths, "{"), "}"));
for (String subPath : subPaths) {
- if (!isPathContaining(new Path(otherPath, subPath).toString())) {
- return false;
+ ArrayList<String> pathErrors = isPathContaining(new Path(otherPath, subPath).toString(), inputDatasetDescriptorConfig.getIsInputDataset());
+ if (pathErrors.size() != 0) {
+ return pathErrors;
}
}
- return true;
+ return errors;
} else {
- return isPathContaining(otherPath);
+ return isPathContaining(otherPath, inputDatasetDescriptorConfig.getIsInputDataset());
}
}
@@ -113,41 +118,49 @@ public class FSDatasetDescriptor extends BaseDatasetDescriptor implements Datase
* accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
* is a glob pattern, we return false.
*
- * @param otherPath a glob pattern that describes a set of paths.
+ * @param inputDatasetDescriptorConfigPath a glob pattern that describes a set of paths.
* @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
*/
- private boolean isPathContaining(String otherPath) {
- if (otherPath == null) {
- return false;
+ private ArrayList<String> isPathContaining(String inputDatasetDescriptorConfigPath, Boolean inputDataset) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ ArrayList<String> errors = new ArrayList<>();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), inputDatasetDescriptorConfigPath, true);
+ if (errors.size() != 0) {
+ return errors;
}
+
if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
- return true;
+ return errors;
}
- if (PathUtils.isGlob(new Path(otherPath))) {
- return false;
+ if (PathUtils.isGlob(new Path(inputDatasetDescriptorConfigPath))) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, inputDatasetDescriptorConfigPath));
+ return errors;
}
GlobPattern globPattern = new GlobPattern(this.getPath());
- return globPattern.matches(otherPath);
+
+ if (!globPattern.matches(inputDatasetDescriptorConfigPath)) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, inputDatasetDescriptorConfigPath, this.getPath()));
+ }
+ return errors;
}
/**
* {@inheritDoc}
*/
@Override
- public boolean contains(DatasetDescriptor o) {
- if (!super.contains(o)) {
- return false;
+ public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ if (super.contains(inputDatasetDescriptorConfig).size() != 0) {
+ return super.contains(inputDatasetDescriptorConfig);
}
- FSDatasetDescriptor other = (FSDatasetDescriptor) o;
-
- if ((this.isCompacted() != other.isCompacted()) ||
- (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
- return false;
- }
+ FSDatasetDescriptor inputFSDatasetDescriptor = (FSDatasetDescriptor) inputDatasetDescriptorConfig;
- return this.getPartitionConfig().contains(other.getPartitionConfig());
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, String.valueOf(this.isCompacted()), String.valueOf(inputFSDatasetDescriptor.isCompacted()), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, String.valueOf(this.isCompactedAndDeduped()), String.valueOf(inputFSDatasetDescriptor.isCompactedAndDeduped()), false);
+ errors.addAll(this.getPartitionConfig().contains(inputFSDatasetDescriptor.getPartitionConfig()));
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
index c3eaee47c..563edc772 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -33,6 +34,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -42,8 +44,8 @@ import org.apache.gobblin.util.ConfigUtils;
* the regex pattern) is validated.
*/
@Slf4j
-@ToString (exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString (exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
public class FSDatasetPartitionConfig {
@Getter
private final String partitionType;
@@ -51,6 +53,8 @@ public class FSDatasetPartitionConfig {
private final String partitionPattern;
@Getter
private final Config rawConfig;
+ @Getter
+ protected Boolean isInputDataset;
public enum PartitionType {
DATETIME("datetime"),
@@ -77,8 +81,9 @@ public class FSDatasetPartitionConfig {
.build());
public FSDatasetPartitionConfig(Config config) throws IOException {
- String partitionType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
- String partitionPattern = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ String partitionType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX + "." + DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+ String partitionPattern = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX + "." + DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
partitionPattern = DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
} else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
@@ -87,7 +92,8 @@ public class FSDatasetPartitionConfig {
validatePartitionConfig(partitionType, partitionPattern);
this.partitionType = partitionType;
this.partitionPattern = partitionPattern;
- this.rawConfig = config.withFallback(DEFAULT_FALLBACK);
+ this.rawConfig = ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DEFAULT_FALLBACK);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
private void validatePartitionConfig(String partitionType, String partitionPattern)
@@ -128,13 +134,10 @@ public class FSDatasetPartitionConfig {
}
}
- public boolean contains(FSDatasetPartitionConfig other) {
- if (other == null) {
- return false;
- }
- return ((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
- || this.getPartitionType().equalsIgnoreCase(other.getPartitionType())))
- && ((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
- || this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
+ public ArrayList<String> contains(FSDatasetPartitionConfig inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, this.getPartitionType(), inputDatasetDescriptorConfig.getPartitionType(), false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, this.getPartitionPattern(), inputDatasetDescriptorConfig.getPartitionPattern(), false);
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
index 525acd429..2ba562138 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
@@ -19,19 +19,21 @@ package org.apache.gobblin.service.modules.dataset;
import com.typesafe.config.Config;
import java.io.IOException;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
/**
* An implementation of {@link FSVolumeDatasetDescriptor} with fs.uri specified.
*/
@Alpha
-@ToString(callSuper = true, exclude = {"rawConfig"})
-@EqualsAndHashCode(callSuper = true, exclude = {"rawConfig"})
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
@Getter
private final String fsUri;
@@ -39,18 +41,19 @@ public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
public FSVolumeDatasetDescriptor(Config config) throws IOException {
super(config);
this.fsUri = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FS_URI_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
@Override
- public boolean contains(DatasetDescriptor o) {
- if (!super.contains(o)) {
- return false;
+ public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ if (super.contains(inputDatasetDescriptorConfig).size() != 0) {
+ return super.contains(inputDatasetDescriptorConfig);
}
- FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) o;
-
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
- .equals(other.getFsUri());
+ FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) inputDatasetDescriptorConfig;
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.FS_URI_KEY, this.getFsUri(), other.getFsUri(), false);
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index e02940b5d..aff1a89bb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -23,12 +23,14 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -42,8 +44,8 @@ import org.apache.gobblin.util.ConfigUtils;
* </ul>
*/
@Alpha
-@ToString (exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString (exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
public class FormatConfig {
@Getter
private final String format;
@@ -53,6 +55,8 @@ public class FormatConfig {
private final EncryptionConfig encryptionConfig;
@Getter
private final Config rawConfig;
+ @Getter
+ protected Boolean isInputDataset;
private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -67,24 +71,30 @@ public class FormatConfig {
.empty()));
this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
withFallback(DEFAULT_FALLBACK);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
- public boolean contains(FormatConfig other) {
- return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
- && containsEncryptionConfig(other.getEncryptionConfig());
+ public ArrayList<String> contains(FormatConfig inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ errors.addAll(containsFormat(inputDatasetDescriptorConfig.getFormat(), inputDatasetDescriptorConfig.getIsInputDataset()));
+ errors.addAll(containsCodec(inputDatasetDescriptorConfig.getCodecType(), inputDatasetDescriptorConfig.getIsInputDataset()));
+ errors.addAll(containsEncryptionConfig(inputDatasetDescriptorConfig.getEncryptionConfig()));
+ return errors;
}
- private boolean containsFormat(String otherFormat) {
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
- || (this.getFormat().equalsIgnoreCase(otherFormat));
+ private ArrayList<String> containsFormat(String inputDatasetDescriptorConfigFormat, Boolean inputDataset) {
+ ArrayList<String> errors = new ArrayList<>();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.FORMAT_KEY, this.getFormat(), inputDatasetDescriptorConfigFormat, false);
+ return errors;
}
- private boolean containsCodec(String otherCodecType) {
- return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
- || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+ private ArrayList<String> containsCodec(String inputDatasetDescriptorConfigCodecType, Boolean inputDataset) {
+ ArrayList<String> errors = new ArrayList<>();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.CODEC_KEY, this.getCodecType(), inputDatasetDescriptorConfigCodecType, false);
+ return errors;
}
- private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
- return this.getEncryptionConfig().contains(otherEncryptionConfig);
+ private ArrayList<String> containsEncryptionConfig(EncryptionConfig inputDatasetDescriptorConfigEncryptionConfig) {
+ return this.getEncryptionConfig().contains(inputDatasetDescriptorConfigEncryptionConfig);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
index fb1bcd4cc..164d7b126 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -18,8 +18,11 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.hadoop.fs.GlobPattern;
import com.google.common.base.Splitter;
@@ -27,7 +30,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import lombok.EqualsAndHashCode;
-import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
@@ -36,7 +38,6 @@ import org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersio
import org.apache.gobblin.util.ConfigUtils;
-@Slf4j
/**
* As of now, {@link HiveDatasetDescriptor} has same implementation as that of {@link SqlDatasetDescriptor}.
* Fields {@link HiveDatasetDescriptor#isPartitioned}, {@link HiveDatasetDescriptor#partitionColumn} and
@@ -77,6 +78,7 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
.withValue(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST,
ConfigValueFactory.fromAnyRef(createHiveDatasetWhitelist())
));
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
// Using Hadoop's GlobPattern instead of java.util.regex, because could not find any API in java.util.regex
@@ -99,35 +101,33 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
}
@Override
- protected boolean isPathContaining(DatasetDescriptor other) {
- String otherPath = other.getPath();
- if (otherPath == null) {
- return false;
- }
+ protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ String otherPath = inputDatasetDescriptorConfig.getPath();
- if (this.isPartitioned != ((HiveDatasetDescriptor) other).isPartitioned) {
- return false;
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+ if (errors.size() != 0){
+ return errors;
}
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, String.valueOf(this.isPartitioned), String.valueOf(((HiveDatasetDescriptor) inputDatasetDescriptorConfig).isPartitioned), false);
+
//Extract the dbName and tableName from otherPath
List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
- if (parts.size() != 2) {
- return false;
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+ if (errors.size() != 0) {
+ return errors;
}
String otherDbName = parts.get(0);
String otherTableNames = parts.get(1);
- if (!this.whitelistBlacklist.acceptDb(otherDbName)) {
- return false;
- }
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyBlacklist(errors, inputDatasetDescriptorConfig.getIsInputDataset(), "database", DatasetDescriptorConfigKeys.DATABASE_KEY, whitelistBlacklist, otherDbName, null);
List<String> otherTables = Splitter.on(",").splitToList(otherTableNames);
for (String otherTable : otherTables) {
- if (!this.whitelistBlacklist.acceptTable(otherDbName, otherTable)) {
- return false;
- }
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyBlacklist(errors, inputDatasetDescriptorConfig.getIsInputDataset(), "table", DatasetDescriptorConfigKeys.TABLE_KEY, whitelistBlacklist, otherDbName, otherTable);
}
- return true;
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
index 0d4503695..96fdf8a6c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
@@ -22,12 +22,13 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
+import java.util.ArrayList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -37,7 +38,6 @@ import org.apache.gobblin.util.ConfigUtils;
* e.g, https://some-api:443/user/123/names, where /user/123/names is the path
* query string is not supported
*/
-@Slf4j
@ToString (exclude = {"rawConfig"})
@EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
@@ -71,6 +71,7 @@ public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements Data
// refers to the full HTTP url
this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, "");
this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
/**
@@ -83,12 +84,14 @@ public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements Data
/**
* Check if this HTTP path equals the other HTTP path
*
- * @param other whose path should be in the format of a HTTP path
+ * @param inputDatasetDescriptorConfig whose path should be in the format of a HTTP path
*/
@Override
- protected boolean isPathContaining(DatasetDescriptor other) {
+ protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
// Might be null
- String otherPath = other.getPath();
- return this.path.equals(otherPath);
+ ArrayList<String> errors = new ArrayList<>();
+ String otherPath = inputDatasetDescriptorConfig.getPath();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, false);
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
index 4ac46a0f9..be866606e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Joiner;
@@ -28,6 +29,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
/**
@@ -35,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils;
* Fields {@link IcebergDatasetDescriptor#databaseName} and {@link IcebergDatasetDescriptor#tableName} are used to
* identify an iceberg.
*/
-@EqualsAndHashCode(callSuper = true)
+@EqualsAndHashCode (callSuper = true)
public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
protected static final String SEPARATION_CHAR = ";";
protected final String databaseName;
@@ -60,6 +62,7 @@ public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
throw new IOException("Invalid iceberg database or table name: " + this.databaseName + ":" + this.tableName);
}
this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
protected boolean isPlatformValid() {
@@ -71,21 +74,27 @@ public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
}
@Override
- protected boolean isPathContaining(DatasetDescriptor other) {
- String otherPath = other.getPath();
- if (otherPath == null) {
- return false;
+ protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ String otherPath = inputDatasetDescriptorConfig.getPath();
+
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+ if (errors.size() != 0) {
+ return errors;
}
//Extract the dbName and tableName from otherPath
List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
- if (parts.size() != 2) {
- return false;
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+ if (errors.size() != 0) {
+ return errors;
}
String otherDbName = parts.get(0);
String otherTableName = parts.get(1);
- return this.databaseName.equals(otherDbName) && this.tableName.equals(otherTableName);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.DATABASE_KEY, this.databaseName, otherDbName, false);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.TABLE_KEY, this.tableName, otherTableName, false);
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 42b68d984..27807b44b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -18,8 +18,8 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.regex.Pattern;
import com.google.common.base.Enums;
import com.google.common.base.Joiner;
@@ -31,14 +31,13 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
-@Slf4j
@ToString (exclude = {"rawConfig"})
@EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
@@ -81,6 +80,7 @@ public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements Datas
this.tableName = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.TABLE_KEY, ".*");
this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+ this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
}
private String fullyQualifiedTableName(String databaseName, String tableName) {
@@ -101,28 +101,34 @@ public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements Datas
* NOTE: otherPath cannot be a globPattern. So:
* isPathContaining("test_db.*;test_table_*") = false
*
- * @param other whose path should be in the format of dbName.tableName
+ * @param inputDatasetDescriptorConfig whose path should be in the format of dbName.tableName
*/
@Override
- protected boolean isPathContaining(DatasetDescriptor other) {
- String otherPath = other.getPath();
- if (otherPath == null) {
- return false;
+ protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+ ArrayList<String> errors = new ArrayList<>();
+ String otherPath = inputDatasetDescriptorConfig.getPath();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+ if (errors.size() != 0) {
+ return errors;
}
if (PathUtils.GLOB_TOKENS.matcher(otherPath).find()) {
- return false;
+ return errors;
}
//Extract the dbName and tableName from otherPath
List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
- if (parts.size() != 2) {
- return false;
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+ if (errors.size() != 0) {
+ return errors;
}
String otherDbName = parts.get(0);
String otherTableName = parts.get(1);
- return Pattern.compile(this.databaseName).matcher(otherDbName).matches() && Pattern.compile(this.tableName).matcher(otherTableName).matches();
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyRegex(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.DATABASE_KEY, this.databaseName, otherDbName);
+ DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyRegex(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.TABLE_KEY, this.tableName, otherTableName);
+
+ return errors;
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index cc9a315cd..36d473b05 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -41,6 +41,7 @@ public class DatasetDescriptorConfigKeys {
public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
public static final String IS_COMPACTED_KEY = "isCompacted";
public static final String IS_COMPACTED_AND_DEDUPED_KEY = "isCompactedAndDeduped";
+ public static final String IS_INPUT_DATASET = "isInputDataset";
//Dataset encryption related keys
public static final String ENCYPTION_PREFIX = "encrypt";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java
new file mode 100644
index 000000000..1cfd70b15
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+
+
+/**
+ * Config keys related to {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+ */
+public class DatasetDescriptorErrorUtils {
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE = "%s.%s is mismatched. User input: %s. Expected value '%s'.";
+ public static final String DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE = "%s.%s is missing. Expected value '%s'.";
+
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION = "%s.%s.%s is mismatched. User input: %s. Expected value '%s'.";
+ public static final String DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE_PARTITION = "%s.%s.%s is missing. Expected value '%s'.";
+
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_STRING_SPLIT = "%s.%s is mismatched. User input: %s is not splittable. Expected separation character: '%s' and total of %d parts.";
+
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN = "%s.%s is mismatched. User input: %s is of a glob pattern. Expected input is not of a glob pattern.";
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN = "%s.%s is mismatched. User input: %s is not contained within the glob of %s.";
+
+ public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST = "%s.%s is mismatched. User input for %s: '%s' is in the blacklist. Please check the provided blacklist configuration.";
+
+ /**
+ * The populateErrorForDatasetDescriptorKey function will compare the submitted variables and add associated errors to the error array.
+ * @param errors list of errors
+ * @param inputDataset whether it's the input or output
+ * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+ * @param inputDatasetDescriptorValue the property from the flow.conf
+ * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+ * @param testNullOnly flag that is true if we only want to test if a property is null or not
+ */
+ public static void populateErrorForDatasetDescriptorKey(ArrayList<String> errors, Boolean inputDataset,
+ String configKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue, Boolean testNullOnly) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (providedDatasetDescriptorValue == null) {
+ errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, inputDatasetDescriptorValue));
+ }
+
+ if (!testNullOnly && !(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(inputDatasetDescriptorValue)
+ || inputDatasetDescriptorValue.equalsIgnoreCase(providedDatasetDescriptorValue))) {
+ errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+ }
+ }
+
+ /**
+ * The populateErrorForDatasetDescriptorKeyPartition function will compare the submitted variables and add associated errors to the error array.
+ * @param errors list of errors
+ * @param inputDataset whether it's the input or output
+ * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+ * @param partitionConfigKey the subkey for the partition (e.g. partition.pattern)
+ * @param inputDatasetDescriptorValue the property from the flow.conf
+ * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+ * @param testNullOnly flag that is true if we only want to test if a property is null or not
+ */
+ public static void populateErrorForDatasetDescriptorKeyPartition(ArrayList<String> errors, Boolean inputDataset,
+ String configKey, String partitionConfigKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue, Boolean testNullOnly) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (providedDatasetDescriptorValue == null) {
+ errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, configKey, partitionConfigKey, inputDatasetDescriptorValue));
+ }
+
+ if (!testNullOnly && !(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(inputDatasetDescriptorValue)
+ || inputDatasetDescriptorValue.equalsIgnoreCase(providedDatasetDescriptorValue))) {
+ errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, configKey, partitionConfigKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+ }
+ }
+
+ /**
+ * The populateErrorForDatasetDescriptorKeyRegex function will compare the submitted variables using the regex matching method and add associated errors to the error array.
+ * @param errors list of errors
+ * @param inputDataset whether it's the input or output
+ * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+ * @param inputDatasetDescriptorValue the property from the flow.conf
+ * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+ */
+ public static void populateErrorForDatasetDescriptorKeyRegex(ArrayList<String> errors, Boolean inputDataset,
+ String configKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (!Pattern.compile(inputDatasetDescriptorValue).matcher(providedDatasetDescriptorValue).matches()) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+ }
+ }
+
+ /**
+ * The populateErrorForDatasetDescriptorKeyBlacklist function will check whether the database and/or table is in the blacklist config.
+ * @param errors list of errors
+ * @param inputDataset whether it's the input or output
+ * @param type whether it's the database or the table within a database that the function is checking
+ * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+ * @param whitelistBlacklist whitelistblacklist object for filtering hive based tables
+ * @param inputDbName the database name from the submitted flow configuration
+ * @param inputTableName the table name from the submitted flow configuration
+ */
+ public static void populateErrorForDatasetDescriptorKeyBlacklist(ArrayList<String> errors, Boolean inputDataset,
+ String type, String configKey, WhitelistBlacklist whitelistBlacklist, String inputDbName, String inputTableName) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (type.equals("database") && !whitelistBlacklist.acceptDb(inputDbName)) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST,
+ datasetDescriptorPrefix, "database", configKey, inputDbName));
+ } else if (type.equals("table") && !whitelistBlacklist.acceptTable(inputDbName, inputTableName)) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST,
+ datasetDescriptorPrefix, "table", configKey, String.join(".", inputDbName, inputTableName)));
+ }
+ }
+
+ /**
+ *
+ * @param errors list of errors
+ * @param inputDataset whether it's the input or output
+ * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+ * @param parts the list of parts after splitting using the separation character
+ * @param inputPath the path from the submitted flow configuration
+ * @param sepChar the delimiter/separation character
+ * @param size the expected size of the list of parts
+ */
+ public static void populateErrorForDatasetDescriptorKeySize(ArrayList<String> errors, Boolean inputDataset,
+ String configKey, List<String> parts, String inputPath, String sepChar, int size) {
+ String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+ if (parts.size() != size) {
+ errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_STRING_SPLIT, datasetDescriptorPrefix, configKey, inputPath, sepChar, size));
+ }
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 0bd19b538..743bbc237 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -17,10 +17,11 @@
package org.apache.gobblin.service.modules.flowgraph.pathfinder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
@@ -125,8 +126,10 @@ public abstract class AbstractPathFinder implements PathFinder {
//Get src/dest dataset descriptors from the flow config
Config srcDatasetDescriptorConfig =
flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+ srcDatasetDescriptorConfig = srcDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_INPUT_DATASET, ConfigValueFactory.fromAnyRef(true));
Config destDatasetDescriptorConfig =
flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+ destDatasetDescriptorConfig = destDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_INPUT_DATASET, ConfigValueFactory.fromAnyRef(false));;
//Add retention config for source and destination dataset descriptors.
if (shouldApplyRetentionOnInput) {
@@ -189,7 +192,7 @@ public abstract class AbstractPathFinder implements PathFinder {
* @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
*/
List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
- DatasetDescriptor destDatasetDescriptor) {
+ DatasetDescriptor destDatasetDescriptor, int numberOfHops) {
List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
List<String> edgeIds = ConfigUtils.getStringList(this.flowConfig, ConfigurationKeys.WHITELISTED_EDGE_IDS);
for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
@@ -213,19 +216,29 @@ public abstract class AbstractPathFinder implements PathFinder {
DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
- try {
- flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
- } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
- flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString());
+ HashMap<String, ArrayList<String>> errors = flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
+ HashMap<String, HashMap<String, ArrayList<String>>> edgeErrors = new HashMap<>();
+ HashMap<String, HashMap<String, ArrayList<String>>> templateErrors = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ edgeErrors.put(flowEdge.getId(), errors);
+
+ if (errors.size() != 0) {
+ try {
+ flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(edgeErrors));
+ }
+ catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
continue;
}
- if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+ ArrayList<String> datasetDescriptorErrors = inputDatasetDescriptor.contains(currentDatasetDescriptor);
+ if (datasetDescriptorErrors.size() == 0) {
DatasetDescriptor edgeOutputDescriptor = makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
specExecutor);
- if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+ if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig()).size() == 0) {
/*
Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
with those of destination dataset descriptor.
@@ -237,6 +250,17 @@ public abstract class AbstractPathFinder implements PathFinder {
}
foundExecutor = true;
}
+ else {
+ HashMap<String, ArrayList<String>> templateError = new HashMap<>();
+ templateError.put("flowTemplateErrors", datasetDescriptorErrors);
+ templateErrors.put(flowEdge.getId(), templateError);
+ try {
+ flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(templateErrors), numberOfHops);
+ }
+ catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ }
}
// Found a SpecExecutor. Proceed to the next FlowEdge.
// TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 5435884da..c39e769c5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -75,6 +75,8 @@ public class BFSPathFinder extends AbstractPathFinder {
public List<FlowEdgeContext> findPathUnicast(DataNode destNode) {
//Initialization of auxiliary data structures used for path computation
this.pathMap = new HashMap<>();
+ int numberOfHops = 1;
+ LinkedList<FlowEdgeContext> childQueue = new LinkedList<>();
//Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
// flow graph.
@@ -86,12 +88,12 @@ public class BFSPathFinder extends AbstractPathFinder {
}
//Base condition 2: Check if we are already at the target. If so, return an empty path.
- if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+ if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor).size() == 0) {
return new ArrayList<>(0);
}
LinkedList<FlowEdgeContext> edgeQueue =
- new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+ new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor, numberOfHops));
for (FlowEdgeContext flowEdgeContext : edgeQueue) {
this.pathMap.put(flowEdgeContext, flowEdgeContext);
}
@@ -102,29 +104,35 @@ public class BFSPathFinder extends AbstractPathFinder {
// 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
// edge E'. If yes, add the edge E' to the edge queue.
// If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
- while (!edgeQueue.isEmpty()) {
- FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+ do {
+ numberOfHops++;
+ while (!edgeQueue.isEmpty()) {
+ FlowEdgeContext flowEdgeContext = edgeQueue.pop();
- DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
- DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+ DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+ DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
- //Are we done?
- if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
- return constructPath(flowEdgeContext);
- }
+ //Are we done?
+ if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+ return constructPath(flowEdgeContext);
+ }
- //Expand the currentNode to its adjacent edges and add them to the queue.
- List<FlowEdgeContext> nextEdges =
- getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
- for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
- //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
- // queue.
- if (!this.pathMap.containsKey(childFlowEdgeContext)) {
- edgeQueue.add(childFlowEdgeContext);
- this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+ //Expand the currentNode to its adjacent edges and add them to the queue.
+ List<FlowEdgeContext> nextEdges = getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor, numberOfHops);
+ for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+ //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+ // queue.
+ if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+ childQueue.add(childFlowEdgeContext);
+ this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+ }
}
}
- }
+ if (!childQueue.isEmpty()) {
+ edgeQueue.addAll(childQueue);
+ childQueue.clear();
+ }
+ } while (!edgeQueue.isEmpty());
//No path found. Return null.
return null;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index bd78e4880..e1aa9eb5e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.service.modules.template;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
@@ -64,9 +66,9 @@ public interface FlowTemplate extends Spec {
* @param userConfig User supplied Config
* @param inputDescriptor input {@link DatasetDescriptor}
* @param outputDescriptor output {@link DatasetDescriptor}
+ * @return
*/
- void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
- throws SpecNotFoundException, JobTemplate.TemplateException;
+ HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor);
/**
* Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} and returns a {@link List} of resolved
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index d8f3bf9b4..0d0d91e28 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -56,6 +57,8 @@ import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTempla
@Slf4j
public class StaticFlowTemplate implements FlowTemplate {
private static final long serialVersionUID = 84641624233978L;
+ private static final String VARIABLE_SUBSTITUTION_PATTERN = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+ private static final String JOB_TEMPLATE_PATTERN = "/jobs/";
@Getter
private URI uri;
@@ -129,11 +132,9 @@ public class StaticFlowTemplate implements FlowTemplate {
DatasetDescriptor outputDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(outputDescriptorConfig);
if (resolvable) {
- try {
- tryResolving(userConfig, inputDescriptor, outputDescriptor);
+ HashMap<String, ArrayList<String>> errors = tryResolving(userConfig, inputDescriptor, outputDescriptor);
+ if (errors.size() == 0) {
result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
- } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
- // Dataset descriptor cannot be resolved so don't add it to result
}
} else {
result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
@@ -165,19 +166,35 @@ public class StaticFlowTemplate implements FlowTemplate {
* is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
* not resolvable.
* @param userConfig User supplied Config
+ * @return errors through attempting to resolve job templates
*/
@Override
- public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
- throws SpecNotFoundException, JobTemplate.TemplateException {
+ public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
+ HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
for (JobTemplate template: this.jobTemplates) {
- this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ ArrayList<String> errors = new ArrayList<>();
+ try {
+ this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ } catch (ConfigException e) {
+ errors.add(e.toString().split(VARIABLE_SUBSTITUTION_PATTERN)[1]);
+ } catch (Exception e) {
+ log.error("Encountered exception during resolving job templates", e);
+ }
+ // Only insert into dictionary if errors exist
+ if (errors.size() != 0) {
+ String jobName = template.getUri().toString().split(JOB_TEMPLATE_PATTERN)[1];
+ resolutionErrors.put(jobName, errors);
+ }
}
+ return resolutionErrors;
}
@Override
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
index 38f23d58d..d06110d1e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -42,7 +42,7 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.CODEC_KEY, ConfigValueFactory.fromAnyRef("gzip"));
FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
- Assert.assertTrue(descriptor1.contains(descriptor2));
+ Assert.assertEquals(descriptor1.contains(descriptor2).size(), 0);
//Add encryption config
Config encConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY, ConfigValueFactory.fromAnyRef("file"))
@@ -50,8 +50,8 @@ public class FSDatasetDescriptorTest {
.atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
Config config3 = config2.withFallback(encConfig);
FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
- Assert.assertTrue(descriptor2.contains(descriptor3));
- Assert.assertTrue(descriptor1.contains(descriptor3));
+ Assert.assertEquals(descriptor2.contains(descriptor3).size(), 0);
+ Assert.assertEquals(descriptor1.contains(descriptor3).size(), 0);
//Add partition config
Config partitionConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, ConfigValueFactory.fromAnyRef("datetime"))
@@ -59,27 +59,27 @@ public class FSDatasetDescriptorTest {
.atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
Config config4 = config3.withFallback(partitionConfig);
FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
- Assert.assertTrue(descriptor3.contains(descriptor4));
- Assert.assertTrue(descriptor2.contains(descriptor4));
- Assert.assertTrue(descriptor1.contains(descriptor4));
+ Assert.assertEquals(descriptor3.contains(descriptor4).size(), 0);
+ Assert.assertEquals(descriptor2.contains(descriptor4).size(), 0);
+ Assert.assertEquals(descriptor1.contains(descriptor4).size(), 0);
//Add compaction/retention config
Config miscConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, ConfigValueFactory.fromAnyRef("true"))
.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef("true"));
Config config5 = config4.withFallback(miscConfig);
FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
- Assert.assertFalse(descriptor4.contains(descriptor5));
- Assert.assertFalse(descriptor3.contains(descriptor5));
- Assert.assertFalse(descriptor2.contains(descriptor5));
- Assert.assertFalse(descriptor1.contains(descriptor5));
+ Assert.assertNotEquals(descriptor4.contains(descriptor5).size(), 0);
+ Assert.assertNotEquals(descriptor3.contains(descriptor5).size(), 0);
+ Assert.assertNotEquals(descriptor2.contains(descriptor5).size(), 0);
+ Assert.assertNotEquals(descriptor1.contains(descriptor5).size(), 0);
// Test subpaths
Config subPathConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c"))
.withValue(DatasetDescriptorConfigKeys.SUBPATHS_KEY, ConfigValueFactory.fromAnyRef("{e,f,g}"))
.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
FSDatasetDescriptor descriptor6 = new FSDatasetDescriptor(subPathConfig);
- Assert.assertTrue(descriptor1.contains(descriptor6));
- Assert.assertFalse(descriptor2.contains(descriptor6));
+ Assert.assertEquals(descriptor1.contains(descriptor6).size(), 0);
+ Assert.assertNotEquals(descriptor2.contains(descriptor6).size(), 0);
//Test fs.uri
Config config7 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c/d"))
@@ -95,9 +95,11 @@ public class FSDatasetDescriptorTest {
FSVolumeDatasetDescriptor descriptor7 = new FSVolumeDatasetDescriptor(config7);
FSVolumeDatasetDescriptor volumeDescriptor = new FSVolumeDatasetDescriptor(config1);
FSVolumeDatasetDescriptor descriptor8 = new FSVolumeDatasetDescriptor(config8);
- Assert.assertTrue(volumeDescriptor.contains(descriptor7));
- Assert.assertFalse(descriptor7.contains(volumeDescriptor));
- Assert.assertFalse(descriptor8.contains(descriptor7));
+ Assert.assertEquals(descriptor1.contains(descriptor6).size(), 0);
+ Assert.assertNotEquals(descriptor2.contains(descriptor6).size(), 0);
+ Assert.assertEquals(volumeDescriptor.contains(descriptor7).size(), 0);
+ Assert.assertNotEquals(descriptor7.contains(volumeDescriptor).size(), 0);
+ Assert.assertNotEquals(descriptor8.contains(descriptor7).size(), 0);
}
@Test
@@ -111,7 +113,7 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
- Assert.assertTrue(descriptor1.contains(descriptor2));
+ Assert.assertEquals(descriptor1.contains(descriptor2).size(), 0);
}
@Test
@@ -124,8 +126,8 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
- Assert.assertTrue(descriptor1.equals(descriptor2));
- Assert.assertTrue(descriptor2.equals(descriptor1));
+ Assert.assertEquals(descriptor2, descriptor1);
+ Assert.assertEquals(descriptor1, descriptor2);
Assert.assertEquals(descriptor1.hashCode(), descriptor2.hashCode());
Config config3 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c/*"))
@@ -133,7 +135,7 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, ConfigValueFactory.fromAnyRef("any"))
.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef("false"));
FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
- Assert.assertTrue(descriptor1.equals(descriptor3));
+ Assert.assertEquals(descriptor3, descriptor1);
Assert.assertEquals(descriptor1.hashCode(), descriptor3.hashCode());
//Ensure switching booleans between 2 boolean member variables does not produce the same hashcode.
@@ -151,7 +153,7 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, ConfigValueFactory.fromAnyRef("false"));
FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
- Assert.assertFalse(descriptor4.equals(descriptor5));
+ Assert.assertNotEquals(descriptor5, descriptor4);
Assert.assertNotEquals(descriptor4.hashCode(), descriptor5.hashCode());
}
@@ -196,8 +198,5 @@ public class FSDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, ConfigValueFactory.fromAnyRef("field1")).atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
Config config6 = config.withFallback(encryptionConfig);
Assert.assertThrows(IOException.class, () -> new FSDatasetDescriptor(config6));
-
-
-
}
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
index a90b2a693..48c527d9c 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
@@ -42,14 +42,14 @@ public class HttpDatasetDescriptorTest {
.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("https"))
.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("https://a.com/b"));
HttpDatasetDescriptor descriptor2 = new HttpDatasetDescriptor(config2);
- Assert.assertTrue(descriptor2.contains(descriptor1));
+ Assert.assertEquals(descriptor2.contains(descriptor1).size(), 0);
// Verify that same path but different platform points to different dataset
Config config3 = ConfigFactory.empty()
.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("http"))
.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("https://a.com/b"));
HttpDatasetDescriptor descriptor3 = new HttpDatasetDescriptor(config3);
- Assert.assertFalse(descriptor3.contains(descriptor1));
+ Assert.assertNotEquals(descriptor3.contains(descriptor1).size(), 0);
}
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
index 98986d224..71ff4c5a4 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
@@ -41,8 +41,8 @@ public class IcebergDatasetDescriptorTest {
IcebergDatasetDescriptor other = new IcebergDatasetDescriptor(config1);
IcebergDatasetDescriptor yetAnother = new IcebergDatasetDescriptor(config2);
- Assert.assertTrue(current.isPathContaining(other));
- Assert.assertFalse(current.isPathContaining(yetAnother));
+ Assert.assertEquals(current.isPathContaining(other).size(), 0);
+ Assert.assertNotEquals(current.isPathContaining(yetAnother).size(), 0);
}
private Config createDatasetDescriptorConfig(String platform, String dbName, String tableName) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
index 75a142a73..6a317438b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
@@ -43,19 +43,19 @@ public class SqlDatasetDescriptorTest {
Config config2 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"));
SqlDatasetDescriptor descriptor2 = new SqlDatasetDescriptor(config2);
- Assert.assertTrue(descriptor2.contains(descriptor1));
+ Assert.assertEquals(descriptor2.contains(descriptor1).size(), 0);
Config config3 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"))
.withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, ConfigValueFactory.fromAnyRef("testDb_.*"))
.withValue(DatasetDescriptorConfigKeys.TABLE_KEY, ConfigValueFactory.fromAnyRef("testTable_.*"));
SqlDatasetDescriptor descriptor3 = new SqlDatasetDescriptor(config3);
- Assert.assertTrue(descriptor3.contains(descriptor1));
+ Assert.assertEquals(descriptor3.contains(descriptor1).size(), 0);
Config config4 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"))
.withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, ConfigValueFactory.fromAnyRef("Db_.*"))
.withValue(DatasetDescriptorConfigKeys.TABLE_KEY, ConfigValueFactory.fromAnyRef("Table_.*"));
SqlDatasetDescriptor descriptor4 = new SqlDatasetDescriptor(config4);
- Assert.assertFalse(descriptor4.contains(descriptor1));
+ Assert.assertNotEquals(descriptor4.contains(descriptor1).size(), 0);
}
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 4c3d4f137..ab82ec937 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -34,7 +34,6 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
@@ -641,8 +640,7 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
Assert.assertNull(dag);
- Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 1);
- spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY));
+ Assert.assertTrue(spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY)));
}
@Test (dependsOnMethods = "testUnresolvedFlow")