You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/19 01:48:34 UTC

[beam] branch master updated: Guard against closing data channel twice (#4283)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 26560d0  Guard against closing data channel twice (#4283)
26560d0 is described below

commit 26560d0effcfd12bf9bfa40c1040e00f093e88ca
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Dec 18 17:48:30 2017 -0800

    Guard against closing data channel twice (#4283)
---
 .../fn/data/BeamFnDataBufferingOutboundObserver.java  |  9 +++++++++
 .../data/BeamFnDataBufferingOutboundObserverTest.java | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index 9c5fd36..d2986c3 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -71,6 +71,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
 
   private long byteCounter;
   private long counter;
+  private boolean closed;
   private final int bufferLimit;
   private final Coder<WindowedValue<T>> coder;
   private final LogicalEndpoint outputLocation;
@@ -87,10 +88,15 @@ public class BeamFnDataBufferingOutboundObserver<T>
     this.coder = coder;
     this.outboundObserver = outboundObserver;
     this.bufferedElements = ByteString.newOutput();
+    this.closed = false;
   }
 
   @Override
   public void close() throws Exception {
+    if (closed) {
+      throw new IllegalStateException("Already closed.");
+    }
+    closed = true;
     BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
     // This will add an empty data block representing the end of stream.
     elements.addDataBuilder()
@@ -108,6 +114,9 @@ public class BeamFnDataBufferingOutboundObserver<T>
 
   @Override
   public void accept(WindowedValue<T> t) throws IOException {
+    if (closed) {
+      throw new IllegalStateException("Already closed.");
+    }
     coder.encode(t, bufferedElements);
     counter += 1;
     if (bufferedElements.size() >= bufferLimit) {
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
index d2f2cf9..30fae87 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
@@ -98,6 +99,24 @@ public class BeamFnDataBufferingOutboundObserverTest {
     consumer.close();
     assertEquals(messageWithData(),
         Iterables.get(values, 2));
+
+    // Test that we can't write to a closed stream.
+    try {
+      consumer.accept(
+          valueInGlobalWindow(
+              new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50]));
+      fail("Writing after close should be prohibited.");
+    } catch (IllegalStateException exn) {
+      // expected
+    }
+
+    // Test that we can't close a stream twice.
+    try {
+      consumer.close();
+      fail("Closing twice should be prohibited.");
+    } catch (IllegalStateException exn) {
+      // expected
+    }
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].