You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/16 23:32:55 UTC

[1/2] beam git commit: [BEAM-1487, BEAM-3016] Address termination correctness issues in BufferingStreamObserver & BeamFnLoggingClient

Repository: beam
Updated Branches:
  refs/heads/master c8f3a9183 -> 3633c40b9


[BEAM-1487, BEAM-3016] Address termination correctness issues in BufferingStreamObserver & BeamFnLoggingClient

The issue with BeamFnLoggingClient is that we can't arriveAndDeregister during termination since
the onReadyHandler may also arrive at the same time which is why we swap to using forced termination.
Also, I added code that would guarantee that log messages produced by the thread which is shutting
down are guaranteed to make it (this was being caught occassionally by the testLogging test).

The BufferingStreamObserver was incorrectly shutting down since it may attempt to enqueue something
into a full queue with a reading thread that has already exitted for some reason so it would loop
forever attempting to insert the poison pill.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6cdea08e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6cdea08e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6cdea08e

Branch: refs/heads/master
Commit: 6cdea08e06b52bb6a34f41dc6521bcfe8f6f83cb
Parents: c8f3a91
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 12 16:26:29 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 16 16:31:59 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       |  7 -------
 .../fn/harness/logging/BeamFnLoggingClient.java | 22 +++++++++++++-------
 .../harness/stream/BufferingStreamObserver.java | 16 +++++++-------
 .../stream/BufferingStreamObserverTest.java     |  4 ++--
 4 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6cdea08e/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index de24f7f..e965923 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -112,13 +112,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <!-- Flaky in Precommit. See BEAM-1487 https://issues.apache.org/jira/browse/BEAM-1487 -->
-            <exclude>org.apache.beam.fn.harness.logging.BeamFnLoggingClientTest</exclude>
-            <exclude>org.apache.beam.fn.harness.stream.BufferingStreamObserverTest</exclude>
-          </excludes>
-        </configuration>
       </plugin>
     </plugins>
   </build>

http://git-wip-us.apache.org/repos/asf/beam/blob/6cdea08e/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index d43ab25..b19277a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -145,12 +145,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
   @Override
   public void close() throws Exception {
     try {
-      // Hang up with the server
-      logRecordHandler.close();
-
-      // Wait for the server to hang up
-      inboundObserverCompletion.get();
-    } finally {
       // Reset the logging configuration to what it is at startup
       for (Logger logger : configuredLoggers) {
         logger.setLevel(null);
@@ -158,6 +152,12 @@ public class BeamFnLoggingClient implements AutoCloseable {
       configuredLoggers.clear();
       LogManager.getLogManager().readConfiguration();
 
+      // Hang up with the server
+      logRecordHandler.close();
+
+      // Wait for the server to hang up
+      inboundObserverCompletion.get();
+    } finally {
       // Shut the channel down
       channel.shutdown();
       if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
@@ -255,6 +255,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
           outboundObserver.onNext(builder.build());
           additionalLogEntries.clear();
         }
+
+        // Perform one more final check to see if there are any log entries to guarantee that
+        // if a log entry was added on the thread performing termination that we will send it.
+        bufferedLogEntries.drainTo(additionalLogEntries);
+        if (!additionalLogEntries.isEmpty()) {
+          outboundObserver.onNext(
+              BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build());
+        }
       } catch (Throwable t) {
         thrown = t;
       }
@@ -281,7 +289,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
       // Terminate the phaser that we block on when attempting to honor flow control on the
       // outbound observer.
-      phaser.arriveAndDeregister();
+      phaser.forceTermination();
 
       try {
         bufferedLogWriter.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/6cdea08e/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
index cda3a4b..cd96440 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
@@ -105,10 +105,10 @@ public final class BufferingStreamObserver<T> implements StreamObserver<T> {
         // We check to see if we were able to successfully insert the poison pill at the front of
         // the queue to cancel the processing thread eagerly or if the processing thread is done.
         try {
-          // The order of these checks is important because short circuiting will cause us to
-          // insert into the queue first and only if it fails do we check that the thread is done.
-          while (!queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)
-              || !queueDrainer.isDone()) {
+          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+          // since the queue may be full and nothing will be emptying it.
+          while (!queueDrainer.isDone()
+              && !queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)) {
           }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
@@ -130,10 +130,10 @@ public final class BufferingStreamObserver<T> implements StreamObserver<T> {
         // the queue forcing the remainder of the elements to be processed or if the processing
         // thread is done.
         try {
-          // The order of these checks is important because short circuiting will cause us to
-          // insert into the queue first and only if it fails do we check that the thread is done.
-          while (!queue.offer((T) POISON_PILL, 60, TimeUnit.SECONDS)
-              || !queueDrainer.isDone()) {
+          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+          // since the queue may be full and nothing will be emptying it.
+          while (!queueDrainer.isDone()
+              && !queue.offerLast((T) POISON_PILL, 60, TimeUnit.SECONDS)) {
           }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/beam/blob/6cdea08e/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
index 76b7ef0..b26e8e1 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
@@ -61,7 +61,7 @@ public class BufferingStreamObserverTest {
                     // critical section. Any thread that enters purposefully blocks by sleeping
                     // to increase the contention between threads artificially.
                     assertFalse(isCriticalSectionShared.getAndSet(true));
-                    Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
                     onNextValues.add(t);
                     assertTrue(isCriticalSectionShared.getAndSet(false));
                   }
@@ -134,7 +134,7 @@ public class BufferingStreamObserverTest {
     }
 
     // Have them wait and then flip that we do allow elements and wake up those awaiting
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
     elementsAllowed.set(true);
     phaser.arrive();
 


[2/2] beam git commit: [BEAM-1487, BEAM-3016] Address termination correctness issues in BufferingStreamObserver & BeamFnLoggingClient

Posted by lc...@apache.org.
[BEAM-1487, BEAM-3016] Address termination correctness issues in BufferingStreamObserver & BeamFnLoggingClient

This closes #3992


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3633c40b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3633c40b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3633c40b

Branch: refs/heads/master
Commit: 3633c40b9f75e69734484feb139c2e755ac36cd2
Parents: c8f3a91 6cdea08
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 16 16:32:46 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 16 16:32:46 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       |  7 -------
 .../fn/harness/logging/BeamFnLoggingClient.java | 22 +++++++++++++-------
 .../harness/stream/BufferingStreamObserver.java | 16 +++++++-------
 .../stream/BufferingStreamObserverTest.java     |  4 ++--
 4 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------