You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/01/19 20:44:57 UTC

[gobblin] branch master updated: [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 34c20ff70 [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)
34c20ff70 is described below

commit 34c20ff70879ff1a8baf997932199abef82b3571
Author: Andy Jiang <an...@outlook.com>
AuthorDate: Thu Jan 19 12:44:52 2023 -0800

    [GOBBLIN-1759] Add error reporting when attempting to resolve flow configs (#3614)
    
    * Add in changes to fetch flow config errors
    
    * Add files
    
    * Fix FSDatasetPartitionCOnfig not taking in the isInputDataset configuration
    
    Change to take in the input for fsdatasetpartitionconfig
    
    * Fix most of error message
    
    * Remove commented out print statements
    
    * Working update priority to +1 for flows that are multihop with first edge being a self edge
    
    * Do error handling for the new FSVolumeDatasetDescriptor
    
    * Fix tests related to contains
    
    * Removed null dereferences
    
    * Removed comments
    
    * Remove shadowing of variables
    
    * Fix test for new flow compilation error reporting
    
    * Fix naming for right side contains
    
    * Use string format template
    
    * Add description for param
    
    * Refactor out comparison of ANY to util function
    
    * Test error in more detail
    
    * Remove unused import
    
    * Further refactor
    
    * Remove unnecessary commenting out
    
    * remove unused import
    
    * Additional refactoring
    
    * Refactor string split for DBs'
    
    * Fix issue with string format for glob pattern inputs
    
    * Address comments
    
    * Fix FSDatasetDescriptor userFlowConfig naming
    
    * Refactor database comparisons
    
    * Add javadocs for util functions
    
    * Remove unused slf4j
    
    * Change internal terminology to single-step vs multi-step data movement
---
 .../service/FlowConfigV2ResourceLocalHandler.java  |  39 +++++-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  13 ++
 .../modules/dataset/BaseDatasetDescriptor.java     |  38 +++---
 .../service/modules/dataset/DatasetDescriptor.java |  12 +-
 .../modules/dataset/DatasetDescriptorUtils.java    |   3 +-
 .../service/modules/dataset/EncryptionConfig.java  |  38 +++---
 .../modules/dataset/FSDatasetDescriptor.java       |  69 ++++++----
 .../modules/dataset/FSDatasetPartitionConfig.java  |  29 +++--
 .../modules/dataset/FSVolumeDatasetDescriptor.java |  21 +--
 .../service/modules/dataset/FormatConfig.java      |  36 ++++--
 .../modules/dataset/HiveDatasetDescriptor.java     |  36 +++---
 .../modules/dataset/HttpDatasetDescriptor.java     |  15 ++-
 .../modules/dataset/IcebergDatasetDescriptor.java  |  25 ++--
 .../modules/dataset/SqlDatasetDescriptor.java      |  30 +++--
 .../flowgraph/DatasetDescriptorConfigKeys.java     |   1 +
 .../flowgraph/DatasetDescriptorErrorUtils.java     | 144 +++++++++++++++++++++
 .../flowgraph/pathfinder/AbstractPathFinder.java   |  40 ++++--
 .../flowgraph/pathfinder/BFSPathFinder.java        |  48 ++++---
 .../service/modules/template/FlowTemplate.java     |   6 +-
 .../modules/template/StaticFlowTemplate.java       |  31 ++++-
 .../modules/dataset/FSDatasetDescriptorTest.java   |  45 ++++---
 .../modules/dataset/HttpDatasetDescriptorTest.java |   4 +-
 .../dataset/IcebergDatasetDescriptorTest.java      |   4 +-
 .../modules/dataset/SqlDatasetDescriptorTest.java  |   6 +-
 .../modules/flow/MultiHopFlowCompilerTest.java     |   4 +-
 25 files changed, 511 insertions(+), 226 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 6370192ca..b20b83346 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,8 +16,13 @@
  */
 package org.apache.gobblin.service;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.Hashtable;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringEscapeUtils;
@@ -61,6 +66,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     if (flowConfig.hasExplain()) {
       createLog += " explain " + flowConfig.isExplain();
     }
+
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
     FlowStatusId flowStatusId =
@@ -111,17 +117,42 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
 
   private String getErrorMessage(FlowSpec flowSpec) {
     StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
+    Hashtable<String, ArrayList<String>> allErrors = new Hashtable<>();
+
     if (!flowSpec.getCompilationErrors().isEmpty()) {
       message.append(" Compilation errors encountered (Sorted by relevance): ");
       FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
       Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
-      int errorId = 0;
+      int errorIdSingleHop = 1;
+      int errorIdMultiHop = 1;
+
+      ArrayList<String> singleHopErrors = new ArrayList<>();
+      ArrayList<String> multiHopErrors = new ArrayList<>();
+
       for (FlowSpec.CompilationError error: errors) {
-        message.append("\n").append(String.format("ERROR[%s]", errorId)).append(error.errorMessage);
-        errorId++;
+        if (error.errorPriority == 0) {
+          singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+          errorIdSingleHop++;
+        } else {
+          multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+          errorIdMultiHop++;
+        }
       }
+
+      allErrors.put("singleHopErrors", singleHopErrors);
+      allErrors.put("multiHopErrors", multiHopErrors);
+    }
+
+    allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
+    ObjectMapper mapper = new ObjectMapper();
+
+    try {
+      return mapper.writeValueAsString(allErrors);
+    } catch (JsonProcessingException e) {
+      log.error("Flow Spec {} errored on Json processing", flowSpec.toString(), e);
+      e.printStackTrace();
     }
-    return message.toString();
+    return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
   }
   /**
    * Note: this method is only implemented for testing, normally partial update would be called in
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index edeea4744..1e007c7c6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -125,6 +125,10 @@ public class FlowSpec implements Configurable, Spec {
       throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
     }
   }
+  public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
+    this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage, numberOfHops));
+  }
+
   public void addCompilationError(String src, String dst, String errorMessage) {
     this.compilationErrors.add(new CompilationError(getConfig(), src, dst, errorMessage));
   }
@@ -134,6 +138,15 @@ public class FlowSpec implements Configurable, Spec {
     public int errorPriority;
     public String errorMessage;
 
+    // Increment the error priority by 1 to eliminate flows with a self edge from having the same priority as a single hop flow edge.
+    // E.g. Cluster1 -> Cluster2 is the desired edge. Cluster1 -> Cluster2 would have error priority of 0, Cluster1 -> Cluster1 -> Cluster2 would have error priority of 1
+    public CompilationError(Config config, String src, String dst, String errorMessage, int numberOfHops) {
+      this(config, src, dst, errorMessage);
+      if (numberOfHops > 1) {
+        errorPriority++;
+      }
+    }
+
     public CompilationError(Config config, String src, String dst, String errorMessage) {
       errorPriority = 0;
       if (!src.equals(ConfigUtils.getString(config, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
index cd98c8b01..8c4878c5d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -23,15 +23,17 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
-@EqualsAndHashCode (exclude = {"description", "rawConfig"})
-@ToString (exclude = {"description", "rawConfig"})
+@EqualsAndHashCode (exclude = {"description", "rawConfig", "isInputDataset"})
+@ToString (exclude = {"description", "rawConfig", "isInputDataset"})
 public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
   @Getter
   private final String platform;
@@ -43,6 +45,8 @@ public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
   private final String description;
   @Getter
   private final Config rawConfig;
+  @Getter
+  protected Boolean isInputDataset;
 
   private static final Config DEFAULT_FALLBACK =
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -57,37 +61,33 @@ public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
     this.isRetentionApplied = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
     this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
     this.rawConfig = config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   /**
    * {@inheritDoc}
    */
-  protected abstract boolean isPathContaining(DatasetDescriptor other);
+  protected abstract ArrayList<String> isPathContaining(DatasetDescriptor other);
 
   /**
    * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
    * datasets described by this {@link DatasetDescriptor} is a subset of the datasets described by the other
    * {@link DatasetDescriptor}. This operation is non-commutative.
-   * @param other
+   * @param inputDatasetDescriptorConfig This is the flow configuration that is sent in from user side and is compared against the flowgraph edges.
    */
   @Override
-  public boolean contains(DatasetDescriptor other) {
-    if (this == other) {
-      return true;
-    }
-
-    if (other == null || !getClass().equals(other.getClass())) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    if (this == inputDatasetDescriptorConfig) {
+      return errors;
     }
 
-    if (this.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
-      return false;
-    }
-
-    if ((this.isRetentionApplied() != other.isRetentionApplied())) {
-      return false;
-    }
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.CLASS_KEY, this.getClass().toString(), inputDatasetDescriptorConfig.getClass().toString(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PLATFORM_KEY, this.getPlatform(), inputDatasetDescriptorConfig.getPlatform(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, String.valueOf(this.isRetentionApplied()), String.valueOf(inputDatasetDescriptorConfig.isRetentionApplied()), false);
 
-    return isPathContaining(other) && getFormatConfig().contains(other.getFormatConfig());
+    errors.addAll(isPathContaining(inputDatasetDescriptorConfig));
+    errors.addAll(getFormatConfig().contains(inputDatasetDescriptorConfig.getFormatConfig()));
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 9c6abfe14..e51df520a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.dataset;
 
 import com.typesafe.config.Config;
 
+import java.util.ArrayList;
 import org.apache.gobblin.annotation.Alpha;
 
 
@@ -62,14 +63,19 @@ public interface DatasetDescriptor {
   public String getDescription();
 
   /**
-   * @return true if this {@link DatasetDescriptor} contains the other {@link DatasetDescriptor} i.e. the
-   * datasets described by the other {@link DatasetDescriptor} is a subset of this {@link DatasetDescriptor}.
+   * @return an arraylist of errors when comparing whether the provided {@link DatasetDescriptor} contains the input {@link DatasetDescriptor}
+   * i.e. the datasets described by the other {@link DatasetDescriptor} is a subset of this {@link DatasetDescriptor}.
    * This operation is non-commutative.
    */
-  public boolean contains(DatasetDescriptor other);
+  public ArrayList<String> contains(DatasetDescriptor other);
 
   /**
    * @return the raw config.
    */
   public Config getRawConfig();
+
+  /**
+   * @return true if is input dataset.
+   */
+  public Boolean getIsInputDataset();
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
index e21a31df1..c3e617994 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptorUtils.java
@@ -19,10 +19,11 @@ package org.apache.gobblin.service.modules.dataset;
 
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-
+@Slf4j
 public class DatasetDescriptorUtils {
   /**
    * Given dataset descriptor config, construct a {@link DatasetDescriptor} object
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index f9e4e3598..a55d20b3a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -24,17 +24,19 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 @Slf4j
-@ToString(exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString(exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
 public class EncryptionConfig {
   @Getter
   private final String encryptionAlgorithm;
@@ -48,6 +50,8 @@ public class EncryptionConfig {
   private final String keystoreEncoding;
   @Getter
   private final Config rawConfig;
+  @Getter
+  protected Boolean isInputDataset;
 
   public enum  EncryptionLevel {
     FILE("file"),
@@ -98,6 +102,7 @@ public class EncryptionConfig {
       validate(this.encryptionLevel, this.encryptedFields);
     }
     this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(encryptionConfig, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   private void validate(String encryptionLevel, String encryptedFields) throws IOException {
@@ -124,26 +129,15 @@ public class EncryptionConfig {
     return;
   }
 
-  public boolean contains(EncryptionConfig other) {
-    if (other == null) {
-      return false;
-    }
+  public ArrayList<String> contains(EncryptionConfig inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, this.getEncryptionAlgorithm(), inputDatasetDescriptorConfig.getEncryptionAlgorithm(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, this.getKeystoreType(), inputDatasetDescriptorConfig.getKeystoreType(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, this.getKeystoreEncoding(), inputDatasetDescriptorConfig.getKeystoreEncoding(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY, this.getEncryptionLevel(), inputDatasetDescriptorConfig.getEncryptionLevel(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, this.getEncryptedFields(), inputDatasetDescriptorConfig.getEncryptedFields(), false);
 
-    String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
-    String otherKeystoreType = other.getKeystoreType();
-    String otherKeystoreEncoding = other.getKeystoreEncoding();
-    String otherEncryptionLevel = other.getEncryptionLevel();
-    String otherEncryptedFields = other.getEncryptedFields();
-
-    return (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionAlgorithm())
-        || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
-        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreType())
-        || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
-        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreEncoding())
-        || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding))
-        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionLevel())
-        || this.encryptionLevel.equalsIgnoreCase(otherEncryptionLevel))
-        && (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
-        || this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 996fa9674..53eaa4f54 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
@@ -35,6 +36,7 @@ import lombok.ToString;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
@@ -73,38 +75,41 @@ public class FSDatasetDescriptor extends BaseDatasetDescriptor implements Datase
     this.subPaths = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.SUBPATHS_KEY, null);
     this.isCompacted = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
     this.isCompactedAndDeduped = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
-    this.partitionConfig = new FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX));
+    this.partitionConfig = new FSDatasetPartitionConfig(config);
     this.rawConfig = config.withFallback(getPartitionConfig().getRawConfig()).withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   /**
    * If other descriptor has subpaths, this method checks that each concatenation of path + subpath is matched by this
    * path. Otherwise, it just checks the path.
    *
-   * @param other descriptor whose path/subpaths to check
+   * @param inputDatasetDescriptorConfig descriptor whose path/subpaths to check
    * @return true if all subpaths are matched by this {@link DatasetDescriptor}'s path, or if subpaths is null and
    * the other's path matches this path.
    */
   @Override
-  protected boolean isPathContaining(DatasetDescriptor other) {
-    String otherPath = other.getPath();
-    String otherSubPaths = ((FSDatasetDescriptor) other).getSubPaths();
+  protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    String otherPath = inputDatasetDescriptorConfig.getPath();
+    String otherSubPaths = ((FSDatasetDescriptor) inputDatasetDescriptorConfig).getSubPaths();
 
     // This allows the special case where "other" is a glob, but is also an exact match with "this" path.
     if (getPath().equals(otherPath)) {
-      return true;
+      return errors;
     }
 
     if (otherSubPaths != null) {
       List<String> subPaths = Splitter.on(",").splitToList(StringUtils.stripEnd(StringUtils.stripStart(otherSubPaths, "{"), "}"));
       for (String subPath : subPaths) {
-        if (!isPathContaining(new Path(otherPath, subPath).toString())) {
-          return false;
+        ArrayList<String> pathErrors = isPathContaining(new Path(otherPath, subPath).toString(), inputDatasetDescriptorConfig.getIsInputDataset());
+        if (pathErrors.size() != 0) {
+          return pathErrors;
         }
       }
-      return true;
+      return errors;
     } else {
-      return isPathContaining(otherPath);
+      return isPathContaining(otherPath, inputDatasetDescriptorConfig.getIsInputDataset());
     }
   }
 
@@ -113,41 +118,49 @@ public class FSDatasetDescriptor extends BaseDatasetDescriptor implements Datase
    * accepted by the other {@link DatasetDescriptor}. If the path description of the other {@link DatasetDescriptor}
    * is a glob pattern, we return false.
    *
-   * @param otherPath a glob pattern that describes a set of paths.
+   * @param inputDatasetDescriptorConfigPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}.
    */
-  private boolean isPathContaining(String otherPath) {
-    if (otherPath == null) {
-      return false;
+  private ArrayList<String> isPathContaining(String inputDatasetDescriptorConfigPath, Boolean inputDataset) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    ArrayList<String> errors = new ArrayList<>();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), inputDatasetDescriptorConfigPath, true);
+    if (errors.size() != 0) {
+      return errors;
     }
+
     if (DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getPath())) {
-      return true;
+      return errors;
     }
 
-    if (PathUtils.isGlob(new Path(otherPath))) {
-      return false;
+    if (PathUtils.isGlob(new Path(inputDatasetDescriptorConfigPath))) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, inputDatasetDescriptorConfigPath));
+      return errors;
     }
 
     GlobPattern globPattern = new GlobPattern(this.getPath());
-    return globPattern.matches(otherPath);
+
+    if (!globPattern.matches(inputDatasetDescriptorConfigPath)) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN, datasetDescriptorPrefix, DatasetDescriptorConfigKeys.PATH_KEY, inputDatasetDescriptorConfigPath, this.getPath()));
+    }
+    return errors;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(inputDatasetDescriptorConfig).size() != 0) {
+      return super.contains(inputDatasetDescriptorConfig);
     }
 
-    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
-
-    if ((this.isCompacted() != other.isCompacted()) ||
-        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
-      return false;
-    }
+    FSDatasetDescriptor inputFSDatasetDescriptor = (FSDatasetDescriptor) inputDatasetDescriptorConfig;
 
-    return this.getPartitionConfig().contains(other.getPartitionConfig());
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, String.valueOf(this.isCompacted()), String.valueOf(inputFSDatasetDescriptor.isCompacted()), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, String.valueOf(this.isCompactedAndDeduped()), String.valueOf(inputFSDatasetDescriptor.isCompactedAndDeduped()), false);
+    errors.addAll(this.getPartitionConfig().contains(inputFSDatasetDescriptor.getPartitionConfig()));
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
index c3eaee47c..563edc772 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -33,6 +34,7 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -42,8 +44,8 @@ import org.apache.gobblin.util.ConfigUtils;
  * the regex pattern) is validated.
  */
 @Slf4j
-@ToString (exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString (exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
 public class FSDatasetPartitionConfig {
   @Getter
   private final String partitionType;
@@ -51,6 +53,8 @@ public class FSDatasetPartitionConfig {
   private final String partitionPattern;
   @Getter
   private final Config rawConfig;
+  @Getter
+  protected Boolean isInputDataset;
 
   public enum PartitionType {
     DATETIME("datetime"),
@@ -77,8 +81,9 @@ public class FSDatasetPartitionConfig {
           .build());
 
   public FSDatasetPartitionConfig(Config config) throws IOException {
-    String partitionType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
-    String partitionPattern = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    String partitionType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX + "." + DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+    String partitionPattern = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX + "." + DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
     if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
       partitionPattern = DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
     } else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
@@ -87,7 +92,8 @@ public class FSDatasetPartitionConfig {
     validatePartitionConfig(partitionType, partitionPattern);
     this.partitionType = partitionType;
     this.partitionPattern = partitionPattern;
-    this.rawConfig = config.withFallback(DEFAULT_FALLBACK);
+    this.rawConfig = ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.PARTITION_PREFIX, DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   private void validatePartitionConfig(String partitionType, String partitionPattern)
@@ -128,13 +134,10 @@ public class FSDatasetPartitionConfig {
     }
   }
 
-  public boolean contains(FSDatasetPartitionConfig other) {
-    if (other == null) {
-      return false;
-    }
-    return ((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
-        || this.getPartitionType().equalsIgnoreCase(other.getPartitionType())))
-        && ((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
-        || this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
+  public ArrayList<String> contains(FSDatasetPartitionConfig inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, this.getPartitionType(), inputDatasetDescriptorConfig.getPartitionType(), false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, this.getPartitionPattern(), inputDatasetDescriptorConfig.getPartitionPattern(), false);
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
index 525acd429..2ba562138 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSVolumeDatasetDescriptor.java
@@ -19,19 +19,21 @@ package org.apache.gobblin.service.modules.dataset;
 
 import com.typesafe.config.Config;
 import java.io.IOException;
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 /**
  * An implementation of {@link FSVolumeDatasetDescriptor} with fs.uri specified.
  */
 @Alpha
-@ToString(callSuper = true, exclude = {"rawConfig"})
-@EqualsAndHashCode(callSuper = true, exclude = {"rawConfig"})
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
 public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
   @Getter
   private final String fsUri;
@@ -39,18 +41,19 @@ public class FSVolumeDatasetDescriptor extends FSDatasetDescriptor{
   public FSVolumeDatasetDescriptor(Config config) throws IOException {
     super(config);
     this.fsUri = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FS_URI_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   @Override
-  public boolean contains(DatasetDescriptor o) {
-    if (!super.contains(o)) {
-      return false;
+  public ArrayList<String> contains(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    if (super.contains(inputDatasetDescriptorConfig).size() != 0) {
+      return super.contains(inputDatasetDescriptorConfig);
     }
 
-    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) o;
-
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getFsUri()) || this.getFsUri()
-        .equals(other.getFsUri());
+    FSVolumeDatasetDescriptor other = (FSVolumeDatasetDescriptor) inputDatasetDescriptorConfig;
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.FS_URI_KEY, this.getFsUri(), other.getFsUri(), false);
+    return errors;
   }
 
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index e02940b5d..aff1a89bb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -23,12 +23,14 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -42,8 +44,8 @@ import org.apache.gobblin.util.ConfigUtils;
  *  </ul>
  */
 @Alpha
-@ToString (exclude = {"rawConfig"})
-@EqualsAndHashCode (exclude = {"rawConfig"})
+@ToString (exclude = {"rawConfig", "isInputDataset"})
+@EqualsAndHashCode (exclude = {"rawConfig", "isInputDataset"})
 public class FormatConfig {
   @Getter
   private final String format;
@@ -53,6 +55,8 @@ public class FormatConfig {
   private final EncryptionConfig encryptionConfig;
   @Getter
   private final Config rawConfig;
+  @Getter
+  protected Boolean isInputDataset;
 
   private static final Config DEFAULT_FALLBACK =
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -67,24 +71,30 @@ public class FormatConfig {
         .empty()));
     this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
         withFallback(DEFAULT_FALLBACK);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
-  public boolean contains(FormatConfig other) {
-    return containsFormat(other.getFormat()) && containsCodec(other.getCodecType())
-        && containsEncryptionConfig(other.getEncryptionConfig());
+  public ArrayList<String> contains(FormatConfig inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    errors.addAll(containsFormat(inputDatasetDescriptorConfig.getFormat(), inputDatasetDescriptorConfig.getIsInputDataset()));
+    errors.addAll(containsCodec(inputDatasetDescriptorConfig.getCodecType(), inputDatasetDescriptorConfig.getIsInputDataset()));
+    errors.addAll(containsEncryptionConfig(inputDatasetDescriptorConfig.getEncryptionConfig()));
+    return errors;
   }
 
-  private boolean containsFormat(String otherFormat) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getFormat())
-        || (this.getFormat().equalsIgnoreCase(otherFormat));
+  private ArrayList<String> containsFormat(String inputDatasetDescriptorConfigFormat, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.FORMAT_KEY, this.getFormat(), inputDatasetDescriptorConfigFormat, false);
+    return errors;
   }
 
-  private boolean containsCodec(String otherCodecType) {
-    return DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getCodecType())
-        || (this.getCodecType().equalsIgnoreCase(otherCodecType));
+  private ArrayList<String> containsCodec(String inputDatasetDescriptorConfigCodecType, Boolean inputDataset) {
+    ArrayList<String> errors = new ArrayList<>();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDataset, DatasetDescriptorConfigKeys.CODEC_KEY, this.getCodecType(), inputDatasetDescriptorConfigCodecType, false);
+    return errors;
   }
 
-  private boolean containsEncryptionConfig(EncryptionConfig otherEncryptionConfig) {
-    return this.getEncryptionConfig().contains(otherEncryptionConfig);
+  private ArrayList<String> containsEncryptionConfig(EncryptionConfig inputDatasetDescriptorConfigEncryptionConfig) {
+    return this.getEncryptionConfig().contains(inputDatasetDescriptorConfigEncryptionConfig);
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
index fb1bcd4cc..164d7b126 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -18,8 +18,11 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.hadoop.fs.GlobPattern;
 
 import com.google.common.base.Splitter;
@@ -27,7 +30,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
 import lombok.EqualsAndHashCode;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
@@ -36,7 +38,6 @@ import org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersio
 import org.apache.gobblin.util.ConfigUtils;
 
 
-@Slf4j
 /**
  * As of now, {@link HiveDatasetDescriptor} has same implementation as that of {@link SqlDatasetDescriptor}.
  * Fields {@link HiveDatasetDescriptor#isPartitioned}, {@link HiveDatasetDescriptor#partitionColumn} and
@@ -77,6 +78,7 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
         .withValue(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST,
             ConfigValueFactory.fromAnyRef(createHiveDatasetWhitelist())
         ));
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   // Using Hadoop's GlobPattern instead of java.util.regex, because could not find any API in java.util.regex
@@ -99,35 +101,33 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
   }
 
   @Override
-  protected boolean isPathContaining(DatasetDescriptor other) {
-    String otherPath = other.getPath();
-    if (otherPath == null) {
-      return false;
-    }
+  protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    String otherPath = inputDatasetDescriptorConfig.getPath();
 
-    if (this.isPartitioned != ((HiveDatasetDescriptor) other).isPartitioned) {
-      return false;
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+    if (errors.size() != 0){
+      return errors;
     }
 
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyPartition(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PARTITION_PREFIX, DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, String.valueOf(this.isPartitioned), String.valueOf(((HiveDatasetDescriptor) inputDatasetDescriptorConfig).isPartitioned), false);
+
     //Extract the dbName and tableName from otherPath
     List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
-    if (parts.size() != 2) {
-      return false;
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+    if (errors.size() != 0) {
+      return errors;
     }
 
     String otherDbName = parts.get(0);
     String otherTableNames = parts.get(1);
 
-    if (!this.whitelistBlacklist.acceptDb(otherDbName)) {
-      return false;
-    }
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyBlacklist(errors, inputDatasetDescriptorConfig.getIsInputDataset(), "database", DatasetDescriptorConfigKeys.DATABASE_KEY, whitelistBlacklist, otherDbName, null);
 
     List<String> otherTables = Splitter.on(",").splitToList(otherTableNames);
     for (String otherTable : otherTables) {
-      if (!this.whitelistBlacklist.acceptTable(otherDbName, otherTable)) {
-        return false;
-      }
+      DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyBlacklist(errors, inputDatasetDescriptorConfig.getIsInputDataset(), "table", DatasetDescriptorConfigKeys.TABLE_KEY, whitelistBlacklist, otherDbName, otherTable);
     }
-    return true;
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
index 0d4503695..96fdf8a6c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptor.java
@@ -22,12 +22,13 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
 
+import java.util.ArrayList;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -37,7 +38,6 @@ import org.apache.gobblin.util.ConfigUtils;
  * e.g, https://some-api:443/user/123/names, where /user/123/names is the path
  * query string is not supported
  */
-@Slf4j
 @ToString (exclude = {"rawConfig"})
 @EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
 public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
@@ -71,6 +71,7 @@ public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements Data
     // refers to the full HTTP url
     this.path = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, "");
     this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   /**
@@ -83,12 +84,14 @@ public class HttpDatasetDescriptor extends BaseDatasetDescriptor implements Data
   /**
    * Check if this HTTP path equals the other HTTP path
    *
-   * @param other whose path should be in the format of a HTTP path
+   * @param inputDatasetDescriptorConfig whose path should be in the format of a HTTP path
    */
   @Override
-  protected boolean isPathContaining(DatasetDescriptor other) {
+  protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
     // Might be null
-    String otherPath = other.getPath();
-    return this.path.equals(otherPath);
+    ArrayList<String> errors = new ArrayList<>();
+    String otherPath = inputDatasetDescriptorConfig.getPath();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, false);
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
index 4ac46a0f9..be866606e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Joiner;
@@ -28,6 +29,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 /**
@@ -35,7 +37,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * Fields {@link IcebergDatasetDescriptor#databaseName} and {@link IcebergDatasetDescriptor#tableName} are used to
  * identify an iceberg.
  */
-@EqualsAndHashCode(callSuper = true)
+@EqualsAndHashCode (callSuper = true)
 public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
   protected static final String SEPARATION_CHAR = ";";
   protected final String databaseName;
@@ -60,6 +62,7 @@ public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
       throw new IOException("Invalid iceberg database or table name: " + this.databaseName + ":" + this.tableName);
     }
     this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   protected boolean isPlatformValid() {
@@ -71,21 +74,27 @@ public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
   }
 
   @Override
-  protected boolean isPathContaining(DatasetDescriptor other) {
-    String otherPath = other.getPath();
-    if (otherPath == null) {
-      return false;
+  protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    String otherPath = inputDatasetDescriptorConfig.getPath();
+
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+    if (errors.size() != 0) {
+      return errors;
     }
 
     //Extract the dbName and tableName from otherPath
     List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
-    if (parts.size() != 2) {
-      return false;
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+    if (errors.size() != 0) {
+      return errors;
     }
 
     String otherDbName = parts.get(0);
     String otherTableName = parts.get(1);
 
-    return this.databaseName.equals(otherDbName) && this.tableName.equals(otherTableName);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.DATABASE_KEY, this.databaseName, otherDbName, false);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.TABLE_KEY, this.tableName, otherTableName, false);
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 42b68d984..27807b44b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -18,8 +18,8 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.regex.Pattern;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Joiner;
@@ -31,14 +31,13 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorErrorUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
 
-@Slf4j
 @ToString (exclude = {"rawConfig"})
 @EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
 public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
@@ -81,6 +80,7 @@ public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements Datas
     this.tableName = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.TABLE_KEY, ".*");
     this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
     this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+    this.isInputDataset = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_INPUT_DATASET, false);
   }
 
   private String fullyQualifiedTableName(String databaseName, String tableName) {
@@ -101,28 +101,34 @@ public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements Datas
    * NOTE: otherPath cannot be a globPattern. So:
    * isPathContaining("test_db.*;test_table_*") = false
    *
-   * @param other whose path should be in the format of dbName.tableName
+   * @param inputDatasetDescriptorConfig whose path should be in the format of dbName.tableName
    */
   @Override
-  protected boolean isPathContaining(DatasetDescriptor other) {
-    String otherPath = other.getPath();
-    if (otherPath == null) {
-      return false;
+  protected ArrayList<String> isPathContaining(DatasetDescriptor inputDatasetDescriptorConfig) {
+    ArrayList<String> errors = new ArrayList<>();
+    String otherPath = inputDatasetDescriptorConfig.getPath();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKey(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, this.getPath(), otherPath, true);
+    if (errors.size() != 0) {
+      return errors;
     }
 
     if (PathUtils.GLOB_TOKENS.matcher(otherPath).find()) {
-      return false;
+      return errors;
     }
 
     //Extract the dbName and tableName from otherPath
     List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
-    if (parts.size() != 2) {
-      return false;
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeySize(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.PATH_KEY, parts, otherPath, SEPARATION_CHAR, 2);
+    if (errors.size() != 0) {
+      return errors;
     }
 
     String otherDbName = parts.get(0);
     String otherTableName = parts.get(1);
 
-    return Pattern.compile(this.databaseName).matcher(otherDbName).matches() && Pattern.compile(this.tableName).matcher(otherTableName).matches();
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyRegex(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.DATABASE_KEY, this.databaseName, otherDbName);
+    DatasetDescriptorErrorUtils.populateErrorForDatasetDescriptorKeyRegex(errors, inputDatasetDescriptorConfig.getIsInputDataset(), DatasetDescriptorConfigKeys.TABLE_KEY, this.tableName, otherTableName);
+
+    return errors;
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index cc9a315cd..36d473b05 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -41,6 +41,7 @@ public class DatasetDescriptorConfigKeys {
   public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
   public static final String IS_COMPACTED_KEY = "isCompacted";
   public static final String IS_COMPACTED_AND_DEDUPED_KEY = "isCompactedAndDeduped";
+  public static final String IS_INPUT_DATASET = "isInputDataset";
 
   //Dataset encryption related keys
   public static final String ENCYPTION_PREFIX = "encrypt";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java
new file mode 100644
index 000000000..1cfd70b15
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorErrorUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+
+
+/**
+ * Config keys related to {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}.
+ */
+public class DatasetDescriptorErrorUtils {
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE = "%s.%s is mismatched. User input: %s. Expected value '%s'.";
+  public static final String DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE = "%s.%s is missing. Expected value '%s'.";
+
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION = "%s.%s.%s is mismatched. User input: %s. Expected value '%s'.";
+  public static final String DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE_PARTITION = "%s.%s.%s is missing. Expected value '%s'.";
+
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_STRING_SPLIT = "%s.%s is mismatched. User input: %s is not splittable. Expected separation character: '%s' and total of %d parts.";
+
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_IS_GLOB_PATTERN = "%s.%s is mismatched. User input: %s is of a glob pattern. Expected input is not of a glob pattern.";
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_GLOB_PATTERN = "%s.%s is mismatched. User input: %s is not contained within the glob of %s.";
+
+  public static final String DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST = "%s.%s is mismatched. User input for %s: '%s' is in the blacklist. Please check the provided blacklist configuration.";
+
+  /**
+   * The populateErrorForDatasetDescriptorKey function will compare the submitted variables and add associated errors to the error array.
+   * @param errors list of errors
+   * @param inputDataset whether it's the input or output
+   * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+   * @param inputDatasetDescriptorValue the property from the flow.conf
+   * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+   * @param testNullOnly flag that is true if we only want to test if a property is null or not
+   */
+  public static void populateErrorForDatasetDescriptorKey(ArrayList<String> errors, Boolean inputDataset,
+      String configKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue, Boolean testNullOnly) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (providedDatasetDescriptorValue == null) {
+      errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, inputDatasetDescriptorValue));
+    }
+
+    if (!testNullOnly && !(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(inputDatasetDescriptorValue)
+        || inputDatasetDescriptorValue.equalsIgnoreCase(providedDatasetDescriptorValue))) {
+      errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+    }
+  }
+
+  /**
+   * The populateErrorForDatasetDescriptorKeyPartition function will compare the submitted variables and add associated errors to the error array.
+   * @param errors list of errors
+   * @param inputDataset whether it's the input or output
+   * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+   * @param partitionConfigKey the subkey for the partition (e.g. partition.pattern)
+   * @param inputDatasetDescriptorValue the property from the flow.conf
+   * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+   * @param testNullOnly flag that is true if we only want to test if a property is null or not
+   */
+  public static void populateErrorForDatasetDescriptorKeyPartition(ArrayList<String> errors, Boolean inputDataset,
+      String configKey, String partitionConfigKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue, Boolean testNullOnly) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (providedDatasetDescriptorValue == null) {
+      errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISSING_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, configKey, partitionConfigKey, inputDatasetDescriptorValue));
+    }
+
+    if (!testNullOnly && !(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(inputDatasetDescriptorValue)
+        || inputDatasetDescriptorValue.equalsIgnoreCase(providedDatasetDescriptorValue))) {
+      errors.add(String.format(DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_PARTITION, datasetDescriptorPrefix, configKey, partitionConfigKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+    }
+  }
+
+  /**
+   * The populateErrorForDatasetDescriptorKeyRegex function will compare the submitted variables using the regex matching method and add associated errors to the error array.
+   * @param errors list of errors
+   * @param inputDataset whether it's the input or output
+   * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+   * @param inputDatasetDescriptorValue the property from the flow.conf
+   * @param providedDatasetDescriptorValue the property from the submitted flow configuration
+   */
+  public static void populateErrorForDatasetDescriptorKeyRegex(ArrayList<String> errors, Boolean inputDataset,
+      String configKey, String inputDatasetDescriptorValue, String providedDatasetDescriptorValue) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (!Pattern.compile(inputDatasetDescriptorValue).matcher(providedDatasetDescriptorValue).matches()) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE, datasetDescriptorPrefix, configKey, providedDatasetDescriptorValue, inputDatasetDescriptorValue));
+    }
+  }
+
+  /**
+   * The populateErrorForDatasetDescriptorKeyBlacklist function will check whether the database and/or table is in the blacklist config.
+   * @param errors list of errors
+   * @param inputDataset whether it's the input or output
+   * @param type whether it's the database or the table within a database that the function is checking
+   * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+   * @param whitelistBlacklist whitelistblacklist object for filtering hive based tables
+   * @param inputDbName the database name from the submitted flow configuration
+   * @param inputTableName the table name from the submitted flow configuration
+   */
+  public static void populateErrorForDatasetDescriptorKeyBlacklist(ArrayList<String> errors, Boolean inputDataset,
+      String type, String configKey, WhitelistBlacklist whitelistBlacklist, String inputDbName, String inputTableName) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (type.equals("database") && !whitelistBlacklist.acceptDb(inputDbName)) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST,
+          datasetDescriptorPrefix, "database", configKey, inputDbName));
+    } else if (type.equals("table") && !whitelistBlacklist.acceptTable(inputDbName, inputTableName)) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_BLACKLIST,
+          datasetDescriptorPrefix, "table", configKey, String.join(".", inputDbName, inputTableName)));
+    }
+  }
+
+  /**
+   *
+   * @param errors list of errors
+   * @param inputDataset whether it's the input or output
+   * @param configKey DatasetDescriptorConfigKeys key of the field fed into the function
+   * @param parts the list of parts after splitting using the separation character
+   * @param inputPath the path from the submitted flow configuration
+   * @param sepChar the delimiter/separation character
+   * @param size the expected size of the list of parts
+   */
+  public static void populateErrorForDatasetDescriptorKeySize(ArrayList<String> errors, Boolean inputDataset,
+      String configKey, List<String> parts, String inputPath, String sepChar, int size) {
+    String datasetDescriptorPrefix = inputDataset ? DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX : DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX;
+    if (parts.size() != size) {
+      errors.add(String.format(DatasetDescriptorErrorUtils.DATASET_DESCRIPTOR_KEY_MISMATCH_ERROR_TEMPLATE_STRING_SPLIT, datasetDescriptorPrefix, configKey, inputPath, sepChar, size));
+    }
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 0bd19b538..743bbc237 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -17,10 +17,11 @@
 
 package org.apache.gobblin.service.modules.flowgraph.pathfinder;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
@@ -125,8 +126,10 @@ public abstract class AbstractPathFinder implements PathFinder {
     //Get src/dest dataset descriptors from the flow config
     Config srcDatasetDescriptorConfig =
         flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    srcDatasetDescriptorConfig = srcDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_INPUT_DATASET, ConfigValueFactory.fromAnyRef(true));
     Config destDatasetDescriptorConfig =
         flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    destDatasetDescriptorConfig = destDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_INPUT_DATASET, ConfigValueFactory.fromAnyRef(false));;
 
     //Add retention config for source and destination dataset descriptors.
     if (shouldApplyRetentionOnInput) {
@@ -189,7 +192,7 @@ public abstract class AbstractPathFinder implements PathFinder {
    * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion.
    */
   List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor,
-      DatasetDescriptor destDatasetDescriptor) {
+      DatasetDescriptor destDatasetDescriptor, int numberOfHops) {
     List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
     List<String> edgeIds = ConfigUtils.getStringList(this.flowConfig, ConfigurationKeys.WHITELISTED_EDGE_IDS);
     for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
@@ -213,19 +216,29 @@ public abstract class AbstractPathFinder implements PathFinder {
             DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft();
             DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight();
 
-            try {
-              flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
-            } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
-              flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + e.toString());
+            HashMap<String, ArrayList<String>> errors = flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
+            HashMap<String, HashMap<String, ArrayList<String>>> edgeErrors = new HashMap<>();
+            HashMap<String, HashMap<String, ArrayList<String>>> templateErrors = new HashMap<>();
+            ObjectMapper mapper = new ObjectMapper();
+            edgeErrors.put(flowEdge.getId(), errors);
+
+            if (errors.size() != 0) {
+              try {
+                flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(edgeErrors));
+              }
+              catch (JsonProcessingException e) {
+                e.printStackTrace();
+              }
               continue;
             }
 
-            if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
+            ArrayList<String> datasetDescriptorErrors = inputDatasetDescriptor.contains(currentDatasetDescriptor);
+            if (datasetDescriptorErrors.size() == 0) {
               DatasetDescriptor edgeOutputDescriptor = makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
               FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
                   specExecutor);
 
-              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) {
+              if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig()).size() == 0) {
                 /*
                 Add to the front of the edge list if platform-independent properties of the output descriptor is compatible
                 with those of destination dataset descriptor.
@@ -237,6 +250,17 @@ public abstract class AbstractPathFinder implements PathFinder {
               }
               foundExecutor = true;
             }
+            else {
+              HashMap<String, ArrayList<String>> templateError = new HashMap<>();
+              templateError.put("flowTemplateErrors", datasetDescriptorErrors);
+              templateErrors.put(flowEdge.getId(), templateError);
+              try {
+                flowSpec.addCompilationError(flowEdge.getSrc(), flowEdge.getDest(), mapper.writeValueAsString(templateErrors), numberOfHops);
+              }
+              catch (JsonProcessingException e) {
+                e.printStackTrace();
+              }
+            }
           }
           // Found a SpecExecutor. Proceed to the next FlowEdge.
           // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 5435884da..c39e769c5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -75,6 +75,8 @@ public class BFSPathFinder extends AbstractPathFinder {
   public List<FlowEdgeContext> findPathUnicast(DataNode destNode) {
     //Initialization of auxiliary data structures used for path computation
     this.pathMap = new HashMap<>();
+    int numberOfHops = 1;
+    LinkedList<FlowEdgeContext> childQueue = new LinkedList<>();
 
     //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
     // flow graph.
@@ -86,12 +88,12 @@ public class BFSPathFinder extends AbstractPathFinder {
     }
 
     //Base condition 2: Check if we are already at the target. If so, return an empty path.
-    if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
+    if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor).size() == 0) {
       return new ArrayList<>(0);
     }
 
     LinkedList<FlowEdgeContext> edgeQueue =
-        new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+        new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor, numberOfHops));
     for (FlowEdgeContext flowEdgeContext : edgeQueue) {
       this.pathMap.put(flowEdgeContext, flowEdgeContext);
     }
@@ -102,29 +104,35 @@ public class BFSPathFinder extends AbstractPathFinder {
     //    2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
     //       edge E'. If yes, add the edge E' to the edge queue.
     // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
-    while (!edgeQueue.isEmpty()) {
-      FlowEdgeContext flowEdgeContext = edgeQueue.pop();
+    do {
+      numberOfHops++;
+      while (!edgeQueue.isEmpty()) {
+        FlowEdgeContext flowEdgeContext = edgeQueue.pop();
 
-      DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
-      DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+        DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
+        DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
 
-      //Are we done?
-      if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
-        return constructPath(flowEdgeContext);
-      }
+        //Are we done?
+        if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
+          return constructPath(flowEdgeContext);
+        }
 
-      //Expand the currentNode to its adjacent edges and add them to the queue.
-      List<FlowEdgeContext> nextEdges =
-          getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
-      for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
-        //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
-        // queue.
-        if (!this.pathMap.containsKey(childFlowEdgeContext)) {
-          edgeQueue.add(childFlowEdgeContext);
-          this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+        //Expand the currentNode to its adjacent edges and add them to the queue.
+        List<FlowEdgeContext> nextEdges = getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor, numberOfHops);
+        for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
+          //Add a pointer from the child edge to the parent edge, if the child edge is not already in the
+          // queue.
+          if (!this.pathMap.containsKey(childFlowEdgeContext)) {
+            childQueue.add(childFlowEdgeContext);
+            this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
+          }
         }
       }
-    }
+      if (!childQueue.isEmpty()) {
+        edgeQueue.addAll(childQueue);
+        childQueue.clear();
+      }
+    } while (!edgeQueue.isEmpty());
     //No path found. Return null.
     return null;
   }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index bd78e4880..e1aa9eb5e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.service.modules.template;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -64,9 +66,9 @@ public interface FlowTemplate extends Spec {
    * @param userConfig User supplied Config
    * @param inputDescriptor input {@link DatasetDescriptor}
    * @param outputDescriptor output {@link DatasetDescriptor}
+   * @return
    */
-  void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException;
+  HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor);
 
   /**
    * Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} and returns a {@link List} of resolved
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index d8f3bf9b4..0d0d91e28 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -56,6 +57,8 @@ import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTempla
 @Slf4j
 public class StaticFlowTemplate implements FlowTemplate {
   private static final long serialVersionUID = 84641624233978L;
+  private static final String VARIABLE_SUBSTITUTION_PATTERN = ":[\\s]*Reader:[a-zA-Z\\d\\s]*:[\\s]";
+  private static final String JOB_TEMPLATE_PATTERN = "/jobs/";
 
   @Getter
   private URI uri;
@@ -129,11 +132,9 @@ public class StaticFlowTemplate implements FlowTemplate {
         DatasetDescriptor outputDescriptor = DatasetDescriptorUtils.constructDatasetDescriptor(outputDescriptorConfig);
 
         if (resolvable) {
-          try {
-            tryResolving(userConfig, inputDescriptor, outputDescriptor);
+          HashMap<String, ArrayList<String>> errors = tryResolving(userConfig, inputDescriptor, outputDescriptor);
+          if (errors.size() == 0) {
             result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
-          } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
-            // Dataset descriptor cannot be resolved so don't add it to result
           }
         } else {
           result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
@@ -165,19 +166,35 @@ public class StaticFlowTemplate implements FlowTemplate {
    * is resolvable only if each of the {@link JobTemplate}s in the flow is resolvable. Throws an exception if the flow is
    * not resolvable.
    * @param userConfig User supplied Config
+   * @return errors through attempting to resolve job templates
    */
   @Override
-  public void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public HashMap<String, ArrayList<String>> tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
+    HashMap<String, ArrayList<String>> resolutionErrors = new HashMap<>();
+
     for (JobTemplate template: this.jobTemplates) {
-      this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      ArrayList<String> errors = new ArrayList<>();
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      } catch (ConfigException e) {
+        errors.add(e.toString().split(VARIABLE_SUBSTITUTION_PATTERN)[1]);
+      } catch (Exception e) {
+        log.error("Encountered exception during resolving job templates", e);
+      }
+      // Only insert into dictionary if errors exist
+      if (errors.size() != 0) {
+        String jobName = template.getUri().toString().split(JOB_TEMPLATE_PATTERN)[1];
+        resolutionErrors.put(jobName, errors);
+      }
     }
+    return resolutionErrors;
   }
 
   @Override
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
index 38f23d58d..d06110d1e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -42,7 +42,7 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.CODEC_KEY, ConfigValueFactory.fromAnyRef("gzip"));
 
     FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
-    Assert.assertTrue(descriptor1.contains(descriptor2));
+    Assert.assertEquals(descriptor1.contains(descriptor2).size(), 0);
 
     //Add encryption config
     Config encConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY, ConfigValueFactory.fromAnyRef("file"))
@@ -50,8 +50,8 @@ public class FSDatasetDescriptorTest {
         .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
     Config config3 = config2.withFallback(encConfig);
     FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
-    Assert.assertTrue(descriptor2.contains(descriptor3));
-    Assert.assertTrue(descriptor1.contains(descriptor3));
+    Assert.assertEquals(descriptor2.contains(descriptor3).size(), 0);
+    Assert.assertEquals(descriptor1.contains(descriptor3).size(), 0);
 
     //Add partition config
     Config partitionConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, ConfigValueFactory.fromAnyRef("datetime"))
@@ -59,27 +59,27 @@ public class FSDatasetDescriptorTest {
         .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
     Config config4 = config3.withFallback(partitionConfig);
     FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
-    Assert.assertTrue(descriptor3.contains(descriptor4));
-    Assert.assertTrue(descriptor2.contains(descriptor4));
-    Assert.assertTrue(descriptor1.contains(descriptor4));
+    Assert.assertEquals(descriptor3.contains(descriptor4).size(), 0);
+    Assert.assertEquals(descriptor2.contains(descriptor4).size(), 0);
+    Assert.assertEquals(descriptor1.contains(descriptor4).size(), 0);
 
     //Add compaction/retention config
     Config miscConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, ConfigValueFactory.fromAnyRef("true"))
         .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef("true"));
     Config config5 = config4.withFallback(miscConfig);
     FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
-    Assert.assertFalse(descriptor4.contains(descriptor5));
-    Assert.assertFalse(descriptor3.contains(descriptor5));
-    Assert.assertFalse(descriptor2.contains(descriptor5));
-    Assert.assertFalse(descriptor1.contains(descriptor5));
+    Assert.assertNotEquals(descriptor4.contains(descriptor5).size(), 0);
+    Assert.assertNotEquals(descriptor3.contains(descriptor5).size(), 0);
+    Assert.assertNotEquals(descriptor2.contains(descriptor5).size(), 0);
+    Assert.assertNotEquals(descriptor1.contains(descriptor5).size(), 0);
 
     // Test subpaths
     Config subPathConfig = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c"))
         .withValue(DatasetDescriptorConfigKeys.SUBPATHS_KEY, ConfigValueFactory.fromAnyRef("{e,f,g}"))
         .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
     FSDatasetDescriptor descriptor6 = new FSDatasetDescriptor(subPathConfig);
-    Assert.assertTrue(descriptor1.contains(descriptor6));
-    Assert.assertFalse(descriptor2.contains(descriptor6));
+    Assert.assertEquals(descriptor1.contains(descriptor6).size(), 0);
+    Assert.assertNotEquals(descriptor2.contains(descriptor6).size(), 0);
 
     //Test fs.uri
     Config config7 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c/d"))
@@ -95,9 +95,11 @@ public class FSDatasetDescriptorTest {
     FSVolumeDatasetDescriptor descriptor7 = new FSVolumeDatasetDescriptor(config7);
     FSVolumeDatasetDescriptor volumeDescriptor = new FSVolumeDatasetDescriptor(config1);
     FSVolumeDatasetDescriptor descriptor8 = new FSVolumeDatasetDescriptor(config8);
-    Assert.assertTrue(volumeDescriptor.contains(descriptor7));
-    Assert.assertFalse(descriptor7.contains(volumeDescriptor));
-    Assert.assertFalse(descriptor8.contains(descriptor7));
+    Assert.assertEquals(descriptor1.contains(descriptor6).size(), 0);
+    Assert.assertNotEquals(descriptor2.contains(descriptor6).size(), 0);
+    Assert.assertEquals(volumeDescriptor.contains(descriptor7).size(), 0);
+    Assert.assertNotEquals(descriptor7.contains(volumeDescriptor).size(), 0);
+    Assert.assertNotEquals(descriptor8.contains(descriptor7).size(), 0);
   }
 
   @Test
@@ -111,7 +113,7 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
 
     FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
-    Assert.assertTrue(descriptor1.contains(descriptor2));
+    Assert.assertEquals(descriptor1.contains(descriptor2).size(), 0);
   }
 
   @Test
@@ -124,8 +126,8 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("hdfs"));
     FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
 
-    Assert.assertTrue(descriptor1.equals(descriptor2));
-    Assert.assertTrue(descriptor2.equals(descriptor1));
+    Assert.assertEquals(descriptor2, descriptor1);
+    Assert.assertEquals(descriptor1, descriptor2);
     Assert.assertEquals(descriptor1.hashCode(), descriptor2.hashCode());
 
     Config config3 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("/a/b/c/*"))
@@ -133,7 +135,7 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, ConfigValueFactory.fromAnyRef("any"))
         .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef("false"));
     FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
-    Assert.assertTrue(descriptor1.equals(descriptor3));
+    Assert.assertEquals(descriptor3, descriptor1);
     Assert.assertEquals(descriptor1.hashCode(), descriptor3.hashCode());
 
     //Ensure switching booleans between 2 boolean member variables does not produce the same hashcode.
@@ -151,7 +153,7 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, ConfigValueFactory.fromAnyRef("false"));
     FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
 
-    Assert.assertFalse(descriptor4.equals(descriptor5));
+    Assert.assertNotEquals(descriptor5, descriptor4);
     Assert.assertNotEquals(descriptor4.hashCode(), descriptor5.hashCode());
   }
 
@@ -196,8 +198,5 @@ public class FSDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, ConfigValueFactory.fromAnyRef("field1")).atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
     Config config6 = config.withFallback(encryptionConfig);
     Assert.assertThrows(IOException.class, () -> new FSDatasetDescriptor(config6));
-
-
-
   }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
index a90b2a693..48c527d9c 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HttpDatasetDescriptorTest.java
@@ -42,14 +42,14 @@ public class HttpDatasetDescriptorTest {
         .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("https"))
         .withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("https://a.com/b"));
     HttpDatasetDescriptor descriptor2 = new HttpDatasetDescriptor(config2);
-    Assert.assertTrue(descriptor2.contains(descriptor1));
+    Assert.assertEquals(descriptor2.contains(descriptor1).size(), 0);
 
     // Verify that same path but different platform points to different dataset
     Config config3 = ConfigFactory.empty()
         .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("http"))
         .withValue(DatasetDescriptorConfigKeys.PATH_KEY, ConfigValueFactory.fromAnyRef("https://a.com/b"));
     HttpDatasetDescriptor descriptor3 = new HttpDatasetDescriptor(config3);
-    Assert.assertFalse(descriptor3.contains(descriptor1));
+    Assert.assertNotEquals(descriptor3.contains(descriptor1).size(), 0);
 
   }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
index 98986d224..71ff4c5a4 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
@@ -41,8 +41,8 @@ public class IcebergDatasetDescriptorTest {
     IcebergDatasetDescriptor other = new IcebergDatasetDescriptor(config1);
     IcebergDatasetDescriptor yetAnother = new IcebergDatasetDescriptor(config2);
 
-    Assert.assertTrue(current.isPathContaining(other));
-    Assert.assertFalse(current.isPathContaining(yetAnother));
+    Assert.assertEquals(current.isPathContaining(other).size(), 0);
+    Assert.assertNotEquals(current.isPathContaining(yetAnother).size(), 0);
 
   }
   private Config createDatasetDescriptorConfig(String platform, String dbName, String tableName) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
index 75a142a73..6a317438b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
@@ -43,19 +43,19 @@ public class SqlDatasetDescriptorTest {
 
     Config config2 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"));
     SqlDatasetDescriptor descriptor2 = new SqlDatasetDescriptor(config2);
-    Assert.assertTrue(descriptor2.contains(descriptor1));
+    Assert.assertEquals(descriptor2.contains(descriptor1).size(), 0);
 
     Config config3 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"))
         .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, ConfigValueFactory.fromAnyRef("testDb_.*"))
         .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, ConfigValueFactory.fromAnyRef("testTable_.*"));
 
     SqlDatasetDescriptor descriptor3 = new SqlDatasetDescriptor(config3);
-    Assert.assertTrue(descriptor3.contains(descriptor1));
+    Assert.assertEquals(descriptor3.contains(descriptor1).size(), 0);
 
     Config config4 = ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, ConfigValueFactory.fromAnyRef("sqlserver"))
         .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, ConfigValueFactory.fromAnyRef("Db_.*"))
         .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, ConfigValueFactory.fromAnyRef("Table_.*"));
     SqlDatasetDescriptor descriptor4 = new SqlDatasetDescriptor(config4);
-    Assert.assertFalse(descriptor4.contains(descriptor1));
+    Assert.assertNotEquals(descriptor4.contains(descriptor1).size(), 0);
   }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 4c3d4f137..ab82ec937 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
@@ -641,8 +640,7 @@ public class MultiHopFlowCompilerTest {
     Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
 
     Assert.assertNull(dag);
-    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> c.errorMessage).collect(Collectors.toSet()).size(), 1);
-    spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY));
+    Assert.assertTrue(spec.getCompilationErrors().stream().anyMatch(s -> s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY)));
   }
 
   @Test (dependsOnMethods = "testUnresolvedFlow")