You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 17:46:26 UTC

[1/2] beam git commit: This closes #2242

Repository: beam
Updated Branches:
  refs/heads/master 30033ccba -> 843b663cf


This closes #2242


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/843b663c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/843b663c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/843b663c

Branch: refs/heads/master
Commit: 843b663cfde305bc622f8fe5a587da855417d253
Parents: 30033cc 869002c
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 10:46:16 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:46:16 2017 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   | 30 ++++++----
 .../UnboundedReadEvaluatorFactoryTest.java      | 61 ++++++++++++++++++--
 2 files changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Prevent Double-Close in UnboundedReadEvaluatorFactory

Posted by tg...@apache.org.
Prevent Double-Close in UnboundedReadEvaluatorFactory

Move the actual "close-and-resume" to the overall try block, to ensure
that the reader cannot be double-closed if the first call to close()
throws an IOException.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/869002c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/869002c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/869002c3

Branch: refs/heads/master
Commit: 869002c397b3a360ab9a9afe0a342a6ac2fe7f9e
Parents: 30033cc
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 09:18:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:46:16 2017 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   | 30 ++++++----
 .../UnboundedReadEvaluatorFactoryTest.java      | 61 ++++++++++++++++++--
 2 files changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 69e6920..7c3d50a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -139,7 +139,24 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
             numElements++;
           } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
           Instant watermark = reader.getWatermark();
-          UnboundedSourceShard<OutputT, CheckpointMarkT> residual = finishRead(reader, shard);
+
+          CheckpointMarkT finishedCheckpoint = finishRead(reader, shard);
+          UnboundedSourceShard<OutputT, CheckpointMarkT> residual;
+          // Sometimes resume from a checkpoint even if it's not required
+          if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
+            UnboundedReader<OutputT> toClose = reader;
+            // Prevent double-close. UnboundedReader is AutoCloseable, which does not require
+            // idempotency of close. Nulling out the reader here prevents trying to re-close it
+            // if the call to close throws an IOException.
+            reader = null;
+            toClose.close();
+            residual =
+                UnboundedSourceShard.of(
+                    shard.getSource(), shard.getDeduplicator(), null, finishedCheckpoint);
+          } else {
+            residual = shard.withCheckpoint(finishedCheckpoint);
+          }
+
           resultBuilder
               .addOutput(output)
               .addUnprocessedElements(
@@ -192,7 +209,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
      * Checkpoint the current reader, finalize the previous checkpoint, and return the residual
      * {@link UnboundedSourceShard}.
      */
-    private UnboundedSourceShard<OutputT, CheckpointMarkT> finishRead(
+    private CheckpointMarkT finishRead(
         UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
         throws IOException {
       final CheckpointMark oldMark = shard.getCheckpoint();
@@ -223,14 +240,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
               }
             });
       }
-
-      // Sometimes resume from a checkpoint even if it's not required
-      if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
-        reader.close();
-        return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark);
-      } else {
-        return shard.withCheckpoint(mark);
-      }
+      return mark;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 7e2d85d..cdb362f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -76,6 +76,7 @@ import org.joda.time.ReadableInstant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.invocation.InvocationOnMock;
@@ -96,8 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest {
   private UnboundedSource<Long, ?> source;
   private DirectGraph graph;
 
-  @Rule
-  public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Before
   public void setup() {
@@ -379,6 +380,38 @@ public class UnboundedReadEvaluatorFactoryTest {
         is(true));
   }
 
+  @Test
+  public void evaluatorThrowsInCloseRethrows() throws Exception {
+    ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]))
+            .throwsOnClose();
+
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform =
+        DirectGraphs.getGraph(p).getProducer(pcollection);
+
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
+
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
+        WindowedValue.valueInGlobalWindow(
+            UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
+    CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
+        bundleFactory
+            .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
+            .add(shard)
+            .commit(Instant.now());
+    UnboundedReadEvaluatorFactory factory =
+        new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */);
+    TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
+        factory.forApplication(sourceTransform, inputBundle);
+    thrown.expect(IOException.class);
+    thrown.expectMessage("throws on close");
+    evaluator.processElement(shard);
+  }
+
   /**
    * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where
    * the timestamp is the epoch offset by the value of the element.
@@ -402,12 +435,18 @@ public class UnboundedReadEvaluatorFactoryTest {
     private final Coder<T> coder;
     private final List<T> elems;
     private boolean dedupes = false;
+    private boolean throwOnClose;
 
     public TestUnboundedSource(Coder<T> coder, T... elems) {
+      this(coder, false, Arrays.asList(elems));
+    }
+
+   private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
       readerAdvancedCount = 0;
       readerClosedCount = 0;
       this.coder = coder;
-      this.elems = Arrays.asList(elems);
+      this.elems = elems;
+      this.throwOnClose = throwOnClose;
     }
 
     @Override
@@ -441,9 +480,14 @@ public class UnboundedReadEvaluatorFactoryTest {
       return coder;
     }
 
+    public TestUnboundedSource<T> throwsOnClose() {
+      return new TestUnboundedSource<>(coder, true, elems);
+    }
+
     private class TestUnboundedReader extends UnboundedReader<T> {
       private final List<T> elems;
       private int index;
+      private boolean closed = false;
 
       public TestUnboundedReader(List<T> elems, int startIndex) {
         this.elems = elems;
@@ -503,7 +547,16 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public void close() throws IOException {
-        readerClosedCount++;
+        try {
+          readerClosedCount++;
+          // Enforce the AutoCloseable contract. Close is not idempotent.
+          assertThat(closed, is(false));
+          if (throwOnClose) {
+            throw new IOException(String.format("%s throws on close", TestUnboundedSource.this));
+          }
+        } finally {
+          closed = true;
+        }
       }
     }
   }