You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/23 23:17:26 UTC
incubator-gobblin git commit: [GOBBLIN-471] Skip nulls work units in
DatasetFinderSource and LoopingDatasetFinderSource.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master b39bf8cab -> d6f0112a9
[GOBBLIN-471] Skip nulls work units in DatasetFinderSource and LoopingDatasetFinderSource.
Closes #2344 from ibuenros/datasetfindersource-
skipnull
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d6f0112a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d6f0112a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d6f0112a
Branch: refs/heads/master
Commit: d6f0112a99dd9fb6e973f1999dd196c39a9d3ee4
Parents: b39bf8c
Author: ibuenros <is...@gmail.com>
Authored: Mon Apr 23 16:17:15 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 23 16:17:15 2018 -0700
----------------------------------------------------------------------
.../gobblin/data/management/source/DatasetFinderSource.java | 5 +++--
.../data/management/source/LoopingDatasetFinderSource.java | 6 ++++++
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
index 38fc7e2..77f1d15 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.source;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -110,9 +111,9 @@ public abstract class DatasetFinderSource<S, D> implements WorkUnitStreamSource<
} else {
return Stream.of(new DatasetWrapper(dataset));
}
- }).map(this::workUnitForPartitionInternal);
+ }).map(this::workUnitForPartitionInternal).filter(Objects::nonNull);
} else {
- return datasetStream.map(this::workUnitForDataset);
+ return datasetStream.map(this::workUnitForDataset).filter(Objects::nonNull);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
index 355f463..50b24de 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
@@ -205,6 +205,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour
if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
WorkUnit workUnit = workUnitForDatasetPartition(partition);
+ if (workUnit == null) {
+ continue;
+ }
addDatasetInfoToWorkUnit(workUnit, partition.getDataset());
addPartitionInfoToWorkUnit(workUnit, partition);
this.previousDataset = partition.getDataset();
@@ -218,6 +221,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour
this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset);
} else {
WorkUnit workUnit = workUnitForDataset(dataset);
+ if (workUnit == null) {
+ continue;
+ }
addDatasetInfoToWorkUnit(workUnit, dataset);
this.previousDataset = dataset;
this.generatedWorkUnits++;