You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/19 01:48:34 UTC
[beam] branch master updated: Guard against closing data channel
twice (#4283)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26560d0 Guard against closing data channel twice (#4283)
26560d0 is described below
commit 26560d0effcfd12bf9bfa40c1040e00f093e88ca
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Dec 18 17:48:30 2017 -0800
Guard against closing data channel twice (#4283)
---
.../fn/data/BeamFnDataBufferingOutboundObserver.java | 9 +++++++++
.../data/BeamFnDataBufferingOutboundObserverTest.java | 19 +++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index 9c5fd36..d2986c3 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -71,6 +71,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
private long byteCounter;
private long counter;
+ private boolean closed;
private final int bufferLimit;
private final Coder<WindowedValue<T>> coder;
private final LogicalEndpoint outputLocation;
@@ -87,10 +88,15 @@ public class BeamFnDataBufferingOutboundObserver<T>
this.coder = coder;
this.outboundObserver = outboundObserver;
this.bufferedElements = ByteString.newOutput();
+ this.closed = false;
}
@Override
public void close() throws Exception {
+ if (closed) {
+ throw new IllegalStateException("Already closed.");
+ }
+ closed = true;
BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
// This will add an empty data block representing the end of stream.
elements.addDataBuilder()
@@ -108,6 +114,9 @@ public class BeamFnDataBufferingOutboundObserver<T>
@Override
public void accept(WindowedValue<T> t) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Already closed.");
+ }
coder.encode(t, bufferedElements);
counter += 1;
if (bufferedElements.size() >= bufferLimit) {
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
index d2f2cf9..30fae87 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
@@ -98,6 +99,24 @@ public class BeamFnDataBufferingOutboundObserverTest {
consumer.close();
assertEquals(messageWithData(),
Iterables.get(values, 2));
+
+ // Test that we can't write to a closed stream.
+ try {
+ consumer.accept(
+ valueInGlobalWindow(
+ new byte[BeamFnDataBufferingOutboundObserver.DEFAULT_BUFFER_LIMIT_BYTES - 50]));
+ fail("Writing after close should be prohibited.");
+ } catch (IllegalStateException exn) {
+ // expected
+ }
+
+ // Test that we can't close a stream twice.
+ try {
+ consumer.close();
+ fail("Closing twice should be prohibited.");
+ } catch (IllegalStateException exn) {
+ // expected
+ }
}
@Test
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].