You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/24 23:50:48 UTC
[beam] branch master updated: UnboundedReadFromBoundedSource should
invoke split for small sources
This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f22f443 UnboundedReadFromBoundedSource should invoke split for small sources
new 0aae7e5 Merge pull request #7555: [BEAM-4620] UnboundedReadFromBoundedSource invokes split for small bounded sources
f22f443 is described below
commit f22f443c51ed40a923aa81a73a48492b578c8583
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Thu Jan 17 18:19:40 2019 -0800
UnboundedReadFromBoundedSource should invoke split for small sources
---
.../construction/UnboundedReadFromBoundedSource.java | 20 +++++++++++++++-----
.../UnboundedReadFromBoundedSourceTest.java | 11 +++++++++++
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index e42487c..47bceeb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -74,6 +74,9 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
+ // Using 64MB in cases where we cannot compute a valid estimated size for a source.
+ private static final long DEFAULT_ESTIMATED_SIZE = 64 * 1024 * 1024;
+
private final BoundedSource<T> source;
/**
@@ -124,13 +127,20 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
public List<BoundedToUnboundedSourceAdapter<T>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
try {
- long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
- if (desiredBundleSize <= 0) {
+ long estimatedSize = boundedSource.getEstimatedSizeBytes(options);
+ if (estimatedSize <= 0) {
+ // Source is unable to provide a valid estimated size. So using default size.
LOG.warn(
- "BoundedSource {} cannot estimate its size, skips the initial splits.",
- boundedSource);
- return ImmutableList.of(this);
+ "Cannot determine a valid estimated size for BoundedSource {}. Using default "
+ + "size of {} bytes",
+ boundedSource,
+ DEFAULT_ESTIMATED_SIZE);
+ estimatedSize = DEFAULT_ESTIMATED_SIZE;
}
+
+ // Each split should at least be of size 1 byte.
+ long desiredBundleSize = Math.max(estimatedSize / desiredNumSplits, 1);
+
List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);
return splits.stream()
.map(input -> new BoundedToUnboundedSourceAdapter<>(input))
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 42e036b..3f8671d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core.construction;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -245,6 +246,16 @@ public class UnboundedReadFromBoundedSourceTest {
}
@Test
+ public void testInvokesSplitWithDefaultNumSplitsTooLarge() throws Exception {
+ UnboundedSource<Long, ?> unboundedCountingSource =
+ new BoundedToUnboundedSourceAdapter<Long>(CountingSource.upTo(1));
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<?> splits = unboundedCountingSource.split(100, options);
+ assertEquals(1, splits.size());
+ assertNotEquals(splits.get(0), unboundedCountingSource);
+ }
+
+ @Test
public void testReadFromCheckpointBeforeStart() throws Exception {
thrown.expect(NoSuchElementException.class);