You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/24 23:50:48 UTC

[beam] branch master updated: UnboundedReadFromBoundedSource should invoke split for small sources

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f22f443  UnboundedReadFromBoundedSource should invoke split for small sources
     new 0aae7e5  Merge pull request #7555: [BEAM-4620] UnboundedReadFromBoundedSource invokes split for small bounded sources
f22f443 is described below

commit f22f443c51ed40a923aa81a73a48492b578c8583
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Thu Jan 17 18:19:40 2019 -0800

    UnboundedReadFromBoundedSource should invoke split for small sources
---
 .../construction/UnboundedReadFromBoundedSource.java | 20 +++++++++++++++-----
 .../UnboundedReadFromBoundedSourceTest.java          | 11 +++++++++++
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index e42487c..47bceeb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -74,6 +74,9 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
 
+  // Using 64MB in cases where we cannot compute a valid estimated size for a source.
+  private static final long DEFAULT_ESTIMATED_SIZE = 64 * 1024 * 1024;
+
   private final BoundedSource<T> source;
 
   /**
@@ -124,13 +127,20 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     public List<BoundedToUnboundedSourceAdapter<T>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       try {
-        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
-        if (desiredBundleSize <= 0) {
+        long estimatedSize = boundedSource.getEstimatedSizeBytes(options);
+        if (estimatedSize <= 0) {
+          // Source is unable to provide a valid estimated size. So using default size.
           LOG.warn(
-              "BoundedSource {} cannot estimate its size, skips the initial splits.",
-              boundedSource);
-          return ImmutableList.of(this);
+              "Cannot determine a valid estimated size for BoundedSource {}. Using default "
+                  + "size of {} bytes",
+              boundedSource,
+              DEFAULT_ESTIMATED_SIZE);
+          estimatedSize = DEFAULT_ESTIMATED_SIZE;
         }
+
+        // Each split should at least be of size 1 byte.
+        long desiredBundleSize = Math.max(estimatedSize / desiredNumSplits, 1);
+
         List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);
         return splits.stream()
             .map(input -> new BoundedToUnboundedSourceAdapter<>(input))
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 42e036b..3f8671d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.construction;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -245,6 +246,16 @@ public class UnboundedReadFromBoundedSourceTest {
   }
 
   @Test
+  public void testInvokesSplitWithDefaultNumSplitsTooLarge() throws Exception {
+    UnboundedSource<Long, ?> unboundedCountingSource =
+        new BoundedToUnboundedSourceAdapter<Long>(CountingSource.upTo(1));
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<?> splits = unboundedCountingSource.split(100, options);
+    assertEquals(1, splits.size());
+    assertNotEquals(splits.get(0), unboundedCountingSource);
+  }
+
+  @Test
   public void testReadFromCheckpointBeforeStart() throws Exception {
     thrown.expect(NoSuchElementException.class);