You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/11 23:12:57 UTC
[1/2] beam git commit: [BEAM-3016] Fix blocking issue within run()
when channel terminates while blocking within DirectStreamObserver.
Repository: beam
Updated Branches:
refs/heads/master dc3e2f756 -> 6dd90d89d
[BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1a22a89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1a22a89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1a22a89
Branch: refs/heads/master
Commit: b1a22a89bd0d66db2754ba86f85d418a8122f9ea
Parents: dc3e2f7
Author: Luke Cwik <lc...@google.com>
Authored: Fri Oct 6 09:09:17 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 11 16:11:59 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/fn/harness/FnHarness.java | 10 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 139 ++++++++++---------
.../logging/BeamFnLoggingClientTest.java | 117 ++++++++++++++--
3 files changed, 185 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d6c461f..7d78856 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -20,7 +20,6 @@ package org.apache.beam.fn.harness;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.TextFormat;
-import java.io.PrintStream;
import java.util.EnumMap;
import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
@@ -93,13 +92,10 @@ public class FnHarness {
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
- PrintStream originalErrStream = System.err;
-
try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
options,
loggingApiServiceDescriptor,
- channelFactory::forDescriptor,
- streamObserverFactory::from)) {
+ channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
EnumMap<BeamFnApi.InstructionRequest.RequestCase,
@@ -134,9 +130,9 @@ public class FnHarness {
LOG.info("Entering instruction processing loop");
control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
} catch (Throwable t) {
- t.printStackTrace(originalErrStream);
+ t.printStackTrace();
} finally {
- originalErrStream.println("Shutting SDK harness down.");
+ System.out.println("Shutting SDK harness down.");
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 240e954..d43ab25 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -24,7 +24,10 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
+import io.grpc.Status;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -36,8 +39,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@@ -79,12 +82,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
private static final Formatter FORMATTER = new SimpleFormatter();
- private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
-
- /* Used to signal to a thread processing a queue to finish its work gracefully. */
- private static final BeamFnApi.LogEntry POISON_PILL =
- BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build();
-
/**
* The number of log messages that will be buffered. Assuming log messages are at most 1 KiB,
* this represents a buffer of about 10 MiBs.
@@ -97,22 +94,20 @@ public class BeamFnLoggingClient implements AutoCloseable {
private final Collection<Logger> configuredLoggers;
private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
private final ManagedChannel channel;
- private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+ private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
private final LogControlObserver inboundObserver;
private final LogRecordHandler logRecordHandler;
private final CompletableFuture<Object> inboundObserverCompletion;
+ private final Phaser phaser;
public BeamFnLoggingClient(
PipelineOptions options,
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
- Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory,
- BiFunction<Function<StreamObserver<BeamFnApi.LogControl>,
- StreamObserver<BeamFnApi.LogEntry.List>>,
- StreamObserver<BeamFnApi.LogControl>,
- StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) {
+ Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.inboundObserverCompletion = new CompletableFuture<>();
this.configuredLoggers = new ArrayList<>();
+ this.phaser = new Phaser(1);
this.channel = channelFactory.apply(apiServiceDescriptor);
// Reset the global log manager, get the root logger and remove the default log handlers.
@@ -142,29 +137,32 @@ public class BeamFnLoggingClient implements AutoCloseable {
inboundObserver = new LogControlObserver();
logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService());
logRecordHandler.setLevel(Level.ALL);
- outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver);
+ outboundObserver =
+ (CallStreamObserver<BeamFnApi.LogEntry.List>) stub.logging(inboundObserver);
rootLogger.addHandler(logRecordHandler);
}
@Override
public void close() throws Exception {
- // Hang up with the server
- logRecordHandler.close();
+ try {
+ // Hang up with the server
+ logRecordHandler.close();
- // Wait for the server to hang up
- inboundObserverCompletion.get();
-
- // Reset the logging configuration to what it is at startup
- for (Logger logger : configuredLoggers) {
- logger.setLevel(null);
- }
- configuredLoggers.clear();
- LogManager.getLogManager().readConfiguration();
+ // Wait for the server to hang up
+ inboundObserverCompletion.get();
+ } finally {
+ // Reset the logging configuration to what it is at startup
+ for (Logger logger : configuredLoggers) {
+ logger.setLevel(null);
+ }
+ configuredLoggers.clear();
+ LogManager.getLogManager().readConfiguration();
- // Shut the channel down
- channel.shutdown();
- if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
- channel.shutdownNow();
+ // Shut the channel down
+ channel.shutdown();
+ if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
+ channel.shutdownNow();
+ }
}
}
@@ -231,24 +229,41 @@ public class BeamFnLoggingClient implements AutoCloseable {
List<BeamFnApi.LogEntry> additionalLogEntries =
new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ Throwable thrown = null;
try {
- BeamFnApi.LogEntry logEntry;
- while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) {
+ // As long as we haven't yet terminated, then attempt
+ while (!phaser.isTerminated()) {
+ // Try to wait for a message to show up.
+ BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, TimeUnit.SECONDS);
+ // If we don't have a message then we need to try this loop again.
+ if (logEntry == null) {
+ continue;
+ }
+
+ // Attempt to honor flow control. Phaser termination causes await advance to return
+ // immediately.
+ int phase = phaser.getPhase();
+ if (!outboundObserver.isReady()) {
+ phaser.awaitAdvance(phase);
+ }
+
+ // Batch together as many log messages as possible that are held within the buffer
BeamFnApi.LogEntry.List.Builder builder =
BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
bufferedLogEntries.drainTo(additionalLogEntries);
- for (int i = 0; i < additionalLogEntries.size(); ++i) {
- if (additionalLogEntries.get(i) == POISON_PILL) {
- additionalLogEntries = additionalLogEntries.subList(0, i);
- break;
- }
- }
builder.addAllLogEntries(additionalLogEntries);
outboundObserver.onNext(builder.build());
+ additionalLogEntries.clear();
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ } catch (Throwable t) {
+ thrown = t;
+ }
+ if (thrown != null) {
+ outboundObserver.onError(
+ Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException());
+ throw new IllegalStateException(thrown);
+ } else {
+ outboundObserver.onCompleted();
}
}
@@ -257,31 +272,17 @@ public class BeamFnLoggingClient implements AutoCloseable {
}
@Override
- public void close() {
- synchronized (outboundObserver) {
- // If we are done, then a previous caller has already shutdown the queue processing thread
- // hence we don't need to do it again.
- if (!bufferedLogWriter.isDone()) {
- // We check to see if we were able to successfully insert the poison pill at the end of
- // the queue forcing the remainder of the elements to be processed or if the processing
- // thread is done.
- try {
- // The order of these checks is important because short circuiting will cause us to
- // insert into the queue first and only if it fails do we check that the thread is done.
- while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS)
- || !bufferedLogWriter.isDone()) {
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- waitTillFinish();
- }
- outboundObserver.onCompleted();
+ public synchronized void close() {
+ // If we are done, then a previous caller has already shutdown the queue processing thread
+ // hence we don't need to do it again.
+ if (phaser.isTerminated()) {
+ return;
}
- }
- private void waitTillFinish() {
+ // Terminate the phaser that we block on when attempting to honor flow control on the
+ // outbound observer.
+ phaser.arriveAndDeregister();
+
try {
bufferedLogWriter.get();
} catch (CancellationException e) {
@@ -295,7 +296,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
}
}
- private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> {
+ private class LogControlObserver
+ implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver requestStream) {
+ requestStream.setOnReadyHandler(phaser::arrive);
+ }
+
@Override
public void onNext(BeamFnApi.LogControl value) {
}
@@ -309,5 +317,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
public void onCompleted() {
inboundObserverCompletion.complete(null);
}
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 161ce18..015e5ec 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import com.google.protobuf.Timestamp;
import io.grpc.ManagedChannel;
import io.grpc.Server;
+import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.CallStreamObserver;
@@ -37,7 +38,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
@@ -46,7 +46,9 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -90,6 +92,7 @@ public class BeamFnLoggingClientTest {
.setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
.setLogLocation("LoggerName")
.build();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testLogging() throws Exception {
@@ -124,9 +127,10 @@ public class BeamFnLoggingClientTest {
})
.build();
server.start();
+
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
try {
- ManagedChannel channel =
- InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
BeamFnLoggingClient client = new BeamFnLoggingClient(
PipelineOptionsFactory.fromArgs(new String[] {
@@ -134,8 +138,7 @@ public class BeamFnLoggingClientTest {
"--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
}).create(),
apiServiceDescriptor,
- (Endpoints.ApiServiceDescriptor descriptor) -> channel,
- this::createStreamForTest);
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
// Ensure that log levels were correctly set.
assertEquals(Level.OFF,
@@ -162,9 +165,105 @@ public class BeamFnLoggingClientTest {
}
}
- private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
- Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
- StreamObserver<ReqT> handler) {
- return clientFactory.apply(handler);
+ @Test
+ public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
+ AtomicBoolean clientClosedStream = new AtomicBoolean();
+ Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver = TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ outboundObserver.onError(Status.INTERNAL.withDescription("TEST ERROR").asException());
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+ try {
+ BeamFnLoggingClient client = new BeamFnLoggingClient(
+ PipelineOptionsFactory.fromArgs(new String[] {
+ "--defaultWorkerLogLevel=OFF",
+ "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ }).create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ thrown.expectMessage("TEST ERROR");
+ client.close();
+ } finally {
+ // Verify that after close, log levels are reset.
+ assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+ assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ assertTrue(channel.isShutdown());
+
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception {
+ AtomicBoolean clientClosedStream = new AtomicBoolean();
+ Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ outboundObserver.onCompleted();
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+ try {
+ BeamFnLoggingClient client = new BeamFnLoggingClient(
+ PipelineOptionsFactory.fromArgs(new String[] {
+ "--defaultWorkerLogLevel=OFF",
+ "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ }).create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ client.close();
+ } finally {
+ // Verify that after close, log levels are reset.
+ assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+ assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ assertTrue(channel.isShutdown());
+
+ server.shutdownNow();
+ }
}
}
[2/2] beam git commit: [BEAM-3016] Fix blocking issue within run()
when channel terminates while blocking within DirectStreamObserver.
Posted by lc...@apache.org.
[BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver.
This closes #3944
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6dd90d89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6dd90d89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6dd90d89
Branch: refs/heads/master
Commit: 6dd90d89d3d324c9f9f8cb2b1faf38fe525f0a10
Parents: dc3e2f7 b1a22a8
Author: Luke Cwik <lc...@google.com>
Authored: Wed Oct 11 16:12:49 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 11 16:12:49 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/fn/harness/FnHarness.java | 10 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 139 ++++++++++---------
.../logging/BeamFnLoggingClientTest.java | 117 ++++++++++++++--
3 files changed, 185 insertions(+), 81 deletions(-)
----------------------------------------------------------------------