You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/23 23:17:26 UTC

incubator-gobblin git commit: [GOBBLIN-471] Skip nulls work units in DatasetFinderSource and LoopingDatasetFinderSource.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b39bf8cab -> d6f0112a9


[GOBBLIN-471] Skip nulls work units in DatasetFinderSource and LoopingDatasetFinderSource.

Closes #2344 from ibuenros/datasetfindersource-
skipnull


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d6f0112a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d6f0112a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d6f0112a

Branch: refs/heads/master
Commit: d6f0112a99dd9fb6e973f1999dd196c39a9d3ee4
Parents: b39bf8c
Author: ibuenros <is...@gmail.com>
Authored: Mon Apr 23 16:17:15 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 23 16:17:15 2018 -0700

----------------------------------------------------------------------
 .../gobblin/data/management/source/DatasetFinderSource.java    | 5 +++--
 .../data/management/source/LoopingDatasetFinderSource.java     | 6 ++++++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
index 38fc7e2..77f1d15 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.source;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -110,9 +111,9 @@ public abstract class DatasetFinderSource<S, D> implements WorkUnitStreamSource<
         } else {
           return Stream.of(new DatasetWrapper(dataset));
         }
-      }).map(this::workUnitForPartitionInternal);
+      }).map(this::workUnitForPartitionInternal).filter(Objects::nonNull);
     } else {
-      return datasetStream.map(this::workUnitForDataset);
+      return datasetStream.map(this::workUnitForDataset).filter(Objects::nonNull);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
index 355f463..50b24de 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
@@ -205,6 +205,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour
         if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
           PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
           WorkUnit workUnit = workUnitForDatasetPartition(partition);
+          if (workUnit == null) {
+            continue;
+          }
           addDatasetInfoToWorkUnit(workUnit, partition.getDataset());
           addPartitionInfoToWorkUnit(workUnit, partition);
           this.previousDataset = partition.getDataset();
@@ -218,6 +221,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour
           this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset);
         } else {
           WorkUnit workUnit = workUnitForDataset(dataset);
+          if (workUnit == null) {
+            continue;
+          }
           addDatasetInfoToWorkUnit(workUnit, dataset);
           this.previousDataset = dataset;
           this.generatedWorkUnits++;