You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2021/01/07 23:45:30 UTC
[beam] branch master updated: Revert "[BEAM-11474] Track transform
processing thread in Java SDK harness and set log entry field"
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 41c3811 Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field"
new 6af9d75 Merge pull request #13696 from y1chi/revert-13533-test_logging
41c3811 is described below
commit 41c3811a88181dd4e8a15d79bc3b3a4abf54bd1b
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Thu Jan 7 14:05:54 2021 -0800
Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field"
---
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 27 ------------
.../harness/TransformProcessingThreadTracker.java | 49 ----------------------
.../fn/harness/logging/BeamFnLoggingClient.java | 8 ----
3 files changed, 84 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 23b7699..95ca03e 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -724,8 +724,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void startBundle() {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
// Register as a consumer for each timer.
timerHandlers = new HashMap<>();
for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo :
@@ -745,8 +743,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void processElementForParDo(WindowedValue<InputT> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem;
try {
doFnInvoker.invokeProcessElement(processContext);
@@ -756,8 +752,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem;
try {
Iterator<BoundedWindow> windowIterator =
@@ -773,8 +767,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem;
try {
currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
@@ -797,8 +789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem;
try {
Iterator<BoundedWindow> windowIterator =
@@ -831,8 +821,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private void processElementForSplitRestriction(
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
@@ -861,8 +849,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private void processElementForWindowObservingSplitRestriction(
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
@@ -897,8 +883,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private void processElementForTruncateRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem.withValue(elem.getValue().getKey().getKey());
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
@@ -931,8 +915,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private void processElementForWindowObservingTruncateRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem.withValue(elem.getValue().getKey().getKey());
try {
windowCurrentIndex = -1;
@@ -1030,8 +1012,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private void processElementForWindowObservingSizedElementAndRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentElement = elem.withValue(elem.getValue().getKey().getKey());
try {
windowCurrentIndex = -1;
@@ -1642,8 +1622,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private <K> void processTimer(
String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
currentTimer = timer;
currentTimeDomain = timeDomain;
// The timerIdOrTimerFamilyId contains either a timerId from timer declaration or timerFamilyId
@@ -1671,9 +1649,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void finishBundle() throws Exception {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
-
for (TimerHandler timerHandler : timerHandlers.values()) {
timerHandler.awaitCompletion();
}
@@ -1688,8 +1663,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private void tearDown() {
- TransformProcessingThreadTracker.recordProcessingThread(
- Thread.currentThread().getId(), this.pTransformId);
doFnInvoker.invokeTeardown();
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java
deleted file mode 100644
index ae69db3..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import java.time.Duration;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-
-/**
- * TransformProcessingThreadTracker tracks the thread ids for the transforms that are being
- * processed in the SDK harness.
- */
-public class TransformProcessingThreadTracker {
- private static final TransformProcessingThreadTracker INSTANCE =
- new TransformProcessingThreadTracker();
- private final Cache<Long, String> threadIdToTransformIdMappings;
-
- private TransformProcessingThreadTracker() {
- this.threadIdToTransformIdMappings =
- CacheBuilder.newBuilder().maximumSize(10000).expireAfterAccess(Duration.ofHours(1)).build();
- }
-
- public static TransformProcessingThreadTracker getInstance() {
- return INSTANCE;
- }
-
- public static Cache<Long, String> getThreadIdToTransformIdMappings() {
- return getInstance().threadIdToTransformIdMappings;
- }
-
- public static void recordProcessingThread(Long threadId, String transformId) {
- getInstance().threadIdToTransformIdMappings.put(threadId, transformId);
- }
-}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index e3c3bd9..0fdf404 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -40,7 +40,6 @@ import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
-import org.apache.beam.fn.harness.TransformProcessingThreadTracker;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
@@ -217,13 +216,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
builder.setLogLocation(loggerName);
}
- String transformId =
- TransformProcessingThreadTracker.getThreadIdToTransformIdMappings()
- .getIfPresent((long) record.getThreadID());
- if (transformId != null) {
- builder.setTransformId(transformId);
- }
-
// The thread that sends log records should never perform a blocking publish and
// only insert log records best effort.
if (Thread.currentThread() != logEntryHandlerThread) {