You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/30 21:15:41 UTC
[1/2] beam git commit: [BEAM-1347] Remove the usage of a thread local
on a potentially hot path
Repository: beam
Updated Branches:
refs/heads/master 2d3e9fe75 -> 49067b164
[BEAM-1347] Remove the usage of a thread local on a potentially hot path
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/60779e2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/60779e2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/60779e2e
Branch: refs/heads/master
Commit: 60779e2ecd76f1cb4766050e4560765c1bc3c19b
Parents: 2d3e9fe
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 30 13:15:31 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 30 14:15:23 2017 -0700
----------------------------------------------------------------------
.../fn/harness/logging/BeamFnLoggingClient.java | 36 +++++++++++---------
1 file changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/60779e2e/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index c8d11ed..d56ee6d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@@ -179,11 +178,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries =
new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
private final Future<?> bufferedLogWriter;
- private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;
+ /**
+ * Safe object publishing is not required since we only care if the thread that set
+ * this field is equal to the thread also attempting to add a log entry.
+ */
+ private Thread logEntryHandlerThread;
private LogRecordHandler(ExecutorService executorService) {
bufferedLogWriter = executorService.submit(this);
- logEntryHandler = new ThreadLocal<>();
}
@Override
@@ -204,19 +206,18 @@ public class BeamFnLoggingClient implements AutoCloseable {
builder.setTrace(getStackTraceAsString(record.getThrown()));
}
// The thread that sends log records should never perform a blocking publish and
- // only insert log records best effort. We detect which thread is logging
- // by using the thread local, defaulting to the blocking publish.
- MoreObjects.firstNonNull(
- logEntryHandler.get(), this::blockingPublish).accept(builder.build());
- }
-
- /** Blocks caller till enough space exists to publish this log entry. */
- private void blockingPublish(BeamFnApi.LogEntry logEntry) {
- try {
- bufferedLogEntries.put(logEntry);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ // only insert log records best effort.
+ if (Thread.currentThread() != logEntryHandlerThread) {
+ // Blocks caller till enough space exists to publish this log entry.
+ try {
+ bufferedLogEntries.put(builder.build());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ } else {
+ // Never blocks caller, will drop log message if buffer is full.
+ bufferedLogEntries.offer(builder.build());
}
}
@@ -225,7 +226,8 @@ public class BeamFnLoggingClient implements AutoCloseable {
// Logging which occurs in this thread will attempt to publish log entries into the
// above handler which should never block if the queue is full otherwise
// this thread will get stuck.
- logEntryHandler.set(bufferedLogEntries::offer);
+ logEntryHandlerThread = Thread.currentThread();
+
List<BeamFnApi.LogEntry> additionalLogEntries =
new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
try {
[2/2] beam git commit: [BEAM-1347] Remove the usage of a thread local
on a potentially hot path
Posted by lc...@apache.org.
[BEAM-1347] Remove the usage of a thread local on a potentially hot path
This closes #3260
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49067b16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49067b16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49067b16
Branch: refs/heads/master
Commit: 49067b1642aa5ae01ec393ff9b2e0971f401fb5e
Parents: 2d3e9fe 60779e2
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 30 14:15:31 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 30 14:15:31 2017 -0700
----------------------------------------------------------------------
.../fn/harness/logging/BeamFnLoggingClient.java | 36 +++++++++++---------
1 file changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------