You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/11 23:12:57 UTC

[1/2] beam git commit: [BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.

Repository: beam
Updated Branches:
  refs/heads/master dc3e2f756 -> 6dd90d89d


[BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1a22a89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1a22a89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1a22a89

Branch: refs/heads/master
Commit: b1a22a89bd0d66db2754ba86f85d418a8122f9ea
Parents: dc3e2f7
Author: Luke Cwik <lc...@google.com>
Authored: Fri Oct 6 09:09:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 11 16:11:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/fn/harness/FnHarness.java   |  10 +-
 .../fn/harness/logging/BeamFnLoggingClient.java | 139 ++++++++++---------
 .../logging/BeamFnLoggingClientTest.java        | 117 ++++++++++++++--
 3 files changed, 185 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d6c461f..7d78856 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -20,7 +20,6 @@ package org.apache.beam.fn.harness;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.TextFormat;
-import java.io.PrintStream;
 import java.util.EnumMap;
 import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
@@ -93,13 +92,10 @@ public class FnHarness {
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
     ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
     StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
-    PrintStream originalErrStream = System.err;
-
     try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
         options,
         loggingApiServiceDescriptor,
-        channelFactory::forDescriptor,
-        streamObserverFactory::from)) {
+        channelFactory::forDescriptor)) {
 
       LOG.info("Fn Harness started");
       EnumMap<BeamFnApi.InstructionRequest.RequestCase,
@@ -134,9 +130,9 @@ public class FnHarness {
       LOG.info("Entering instruction processing loop");
       control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
     } catch (Throwable t) {
-      t.printStackTrace(originalErrStream);
+      t.printStackTrace();
     } finally {
-      originalErrStream.println("Shutting SDK harness down.");
+      System.out.println("Shutting SDK harness down.");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 240e954..d43ab25 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -24,7 +24,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Timestamp;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
+import io.grpc.Status;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -36,8 +39,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.logging.Formatter;
 import java.util.logging.Handler;
@@ -79,12 +82,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   private static final Formatter FORMATTER = new SimpleFormatter();
 
-  private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
-
-  /* Used to signal to a thread processing a queue to finish its work gracefully. */
-  private static final BeamFnApi.LogEntry POISON_PILL =
-      BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build();
-
   /**
    * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB,
    * this represents a buffer of about 10 MiBs.
@@ -97,22 +94,20 @@ public class BeamFnLoggingClient implements AutoCloseable {
   private final Collection<Logger> configuredLoggers;
   private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final ManagedChannel channel;
-  private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+  private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
   private final LogControlObserver inboundObserver;
   private final LogRecordHandler logRecordHandler;
   private final CompletableFuture<Object> inboundObserverCompletion;
+  private final Phaser phaser;
 
   public BeamFnLoggingClient(
       PipelineOptions options,
       Endpoints.ApiServiceDescriptor apiServiceDescriptor,
-      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
-      BiFunction<Function<StreamObserver<BeamFnApi.LogControl>,
-                          StreamObserver<BeamFnApi.LogEntry.List>>,
-                 StreamObserver<BeamFnApi.LogControl>,
-                 StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) {
+      Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) {
     this.apiServiceDescriptor = apiServiceDescriptor;
     this.inboundObserverCompletion = new CompletableFuture<>();
     this.configuredLoggers = new ArrayList<>();
+    this.phaser = new Phaser(1);
     this.channel = channelFactory.apply(apiServiceDescriptor);
 
     // Reset the global log manager, get the root logger and remove the default log handlers.
@@ -142,29 +137,32 @@ public class BeamFnLoggingClient implements AutoCloseable {
     inboundObserver = new LogControlObserver();
     logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService());
     logRecordHandler.setLevel(Level.ALL);
-    outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver);
+    outboundObserver =
+        (CallStreamObserver<BeamFnApi.LogEntry.List>) stub.logging(inboundObserver);
     rootLogger.addHandler(logRecordHandler);
   }
 
   @Override
   public void close() throws Exception {
-    // Hang up with the server
-    logRecordHandler.close();
+    try {
+      // Hang up with the server
+      logRecordHandler.close();
 
-    // Wait for the server to hang up
-    inboundObserverCompletion.get();
-
-    // Reset the logging configuration to what it is at startup
-    for (Logger logger : configuredLoggers) {
-      logger.setLevel(null);
-    }
-    configuredLoggers.clear();
-    LogManager.getLogManager().readConfiguration();
+      // Wait for the server to hang up
+      inboundObserverCompletion.get();
+    } finally {
+      // Reset the logging configuration to what it is at startup
+      for (Logger logger : configuredLoggers) {
+        logger.setLevel(null);
+      }
+      configuredLoggers.clear();
+      LogManager.getLogManager().readConfiguration();
 
-    // Shut the channel down
-    channel.shutdown();
-    if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
-      channel.shutdownNow();
+      // Shut the channel down
+      channel.shutdown();
+      if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
+        channel.shutdownNow();
+      }
     }
   }
 
@@ -231,24 +229,41 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
       List<BeamFnApi.LogEntry> additionalLogEntries =
           new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+      Throwable thrown = null;
       try {
-        BeamFnApi.LogEntry logEntry;
-        while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) {
+        // As long as we haven't yet terminated, then attempt
+        while (!phaser.isTerminated()) {
+          // Try to wait for a message to show up.
+          BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, TimeUnit.SECONDS);
+          // If we don't have a message then we need to try this loop again.
+          if (logEntry == null) {
+            continue;
+          }
+
+          // Attempt to honor flow control. Phaser termination causes await advance to return
+          // immediately.
+          int phase = phaser.getPhase();
+          if (!outboundObserver.isReady()) {
+            phaser.awaitAdvance(phase);
+          }
+
+          // Batch together as many log messages as possible that are held within the buffer
           BeamFnApi.LogEntry.List.Builder builder =
               BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
           bufferedLogEntries.drainTo(additionalLogEntries);
-          for (int i = 0; i < additionalLogEntries.size(); ++i) {
-            if (additionalLogEntries.get(i) == POISON_PILL) {
-              additionalLogEntries = additionalLogEntries.subList(0, i);
-              break;
-            }
-          }
           builder.addAllLogEntries(additionalLogEntries);
           outboundObserver.onNext(builder.build());
+          additionalLogEntries.clear();
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IllegalStateException(e);
+      } catch (Throwable t) {
+        thrown = t;
+      }
+      if (thrown != null) {
+        outboundObserver.onError(
+            Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException());
+        throw new IllegalStateException(thrown);
+      } else {
+        outboundObserver.onCompleted();
       }
     }
 
@@ -257,31 +272,17 @@ public class BeamFnLoggingClient implements AutoCloseable {
     }
 
     @Override
-    public void close() {
-      synchronized (outboundObserver) {
-        // If we are done, then a previous caller has already shutdown the queue processing thread
-        // hence we don't need to do it again.
-        if (!bufferedLogWriter.isDone()) {
-          // We check to see if we were able to successfully insert the poison pill at the end of
-          // the queue forcing the remainder of the elements to be processed or if the processing
-          // thread is done.
-          try {
-            // The order of these checks is important because short circuiting will cause us to
-            // insert into the queue first and only if it fails do we check that the thread is done.
-            while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS)
-                || !bufferedLogWriter.isDone()) {
-            }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-          }
-          waitTillFinish();
-        }
-        outboundObserver.onCompleted();
+    public synchronized void close() {
+      // If we are done, then a previous caller has already shutdown the queue processing thread
+      // hence we don't need to do it again.
+      if (phaser.isTerminated()) {
+        return;
       }
-    }
 
-    private void waitTillFinish() {
+      // Terminate the phaser that we block on when attempting to honor flow control on the
+      // outbound observer.
+      phaser.arriveAndDeregister();
+
       try {
         bufferedLogWriter.get();
       } catch (CancellationException e) {
@@ -295,7 +296,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
     }
   }
 
-  private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> {
+  private class LogControlObserver
+      implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
+
+    @Override
+    public void beforeStart(ClientCallStreamObserver requestStream) {
+      requestStream.setOnReadyHandler(phaser::arrive);
+    }
+
     @Override
     public void onNext(BeamFnApi.LogControl value) {
     }
@@ -309,5 +317,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
     public void onCompleted() {
       inboundObserverCompletion.complete(null);
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 161ce18..015e5ec 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.protobuf.Timestamp;
 import io.grpc.ManagedChannel;
 import io.grpc.Server;
+import io.grpc.Status;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.CallStreamObserver;
@@ -37,7 +38,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
@@ -46,7 +46,9 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -90,6 +92,7 @@ public class BeamFnLoggingClientTest {
           .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
           .setLogLocation("LoggerName")
           .build();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testLogging() throws Exception {
@@ -124,9 +127,10 @@ public class BeamFnLoggingClientTest {
             })
             .build();
     server.start();
+
+    ManagedChannel channel =
+        InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
     try {
-      ManagedChannel channel =
-          InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
 
       BeamFnLoggingClient client = new BeamFnLoggingClient(
           PipelineOptionsFactory.fromArgs(new String[] {
@@ -134,8 +138,7 @@ public class BeamFnLoggingClientTest {
               "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
           }).create(),
           apiServiceDescriptor,
-          (Endpoints.ApiServiceDescriptor descriptor) -> channel,
-          this::createStreamForTest);
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel);
 
       // Ensure that log levels were correctly set.
       assertEquals(Level.OFF,
@@ -162,9 +165,105 @@ public class BeamFnLoggingClientTest {
     }
   }
 
-  private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
-      Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
-      StreamObserver<ReqT> handler) {
-    return clientFactory.apply(handler);
+  @Test
+  public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
+    AtomicBoolean clientClosedStream = new AtomicBoolean();
+    Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver =
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver = TestStreams.withOnNext(
+        (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList()))
+        .build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+            .build();
+    Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+        .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+          @Override
+          public StreamObserver<BeamFnApi.LogEntry.List> logging(
+              StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+            outboundServerObserver.set(outboundObserver);
+            outboundObserver.onError(Status.INTERNAL.withDescription("TEST ERROR").asException());
+            return inboundServerObserver;
+          }
+        })
+        .build();
+    server.start();
+
+    ManagedChannel channel =
+        InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+    try {
+      BeamFnLoggingClient client = new BeamFnLoggingClient(
+          PipelineOptionsFactory.fromArgs(new String[] {
+              "--defaultWorkerLogLevel=OFF",
+              "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+          }).create(),
+          apiServiceDescriptor,
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+      thrown.expectMessage("TEST ERROR");
+      client.close();
+    } finally {
+      // Verify that after close, log levels are reset.
+      assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+      assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+      assertTrue(channel.isShutdown());
+
+      server.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception {
+    AtomicBoolean clientClosedStream = new AtomicBoolean();
+    Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver =
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+        TestStreams.withOnNext(
+            (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList()))
+            .build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+            .build();
+    Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+        .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+          @Override
+          public StreamObserver<BeamFnApi.LogEntry.List> logging(
+              StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+            outboundServerObserver.set(outboundObserver);
+            outboundObserver.onCompleted();
+            return inboundServerObserver;
+          }
+        })
+        .build();
+    server.start();
+
+    ManagedChannel channel =
+        InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+    try {
+      BeamFnLoggingClient client = new BeamFnLoggingClient(
+          PipelineOptionsFactory.fromArgs(new String[] {
+              "--defaultWorkerLogLevel=OFF",
+              "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+          }).create(),
+          apiServiceDescriptor,
+          (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+      client.close();
+    } finally {
+      // Verify that after close, log levels are reset.
+      assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+      assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+      assertTrue(channel.isShutdown());
+
+      server.shutdownNow();
+    }
   }
 }


[2/2] beam git commit: [BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.

Posted by lc...@apache.org.
[BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.

This closes #3944


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6dd90d89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6dd90d89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6dd90d89

Branch: refs/heads/master
Commit: 6dd90d89d3d324c9f9f8cb2b1faf38fe525f0a10
Parents: dc3e2f7 b1a22a8
Author: Luke Cwik <lc...@google.com>
Authored: Wed Oct 11 16:12:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 11 16:12:49 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/fn/harness/FnHarness.java   |  10 +-
 .../fn/harness/logging/BeamFnLoggingClient.java | 139 ++++++++++---------
 .../logging/BeamFnLoggingClientTest.java        | 117 ++++++++++++++--
 3 files changed, 185 insertions(+), 81 deletions(-)
----------------------------------------------------------------------