You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2021/01/07 23:45:30 UTC

[beam] branch master updated: Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field"

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 41c3811  Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field"
     new 6af9d75  Merge pull request #13696 from y1chi/revert-13533-test_logging
41c3811 is described below

commit 41c3811a88181dd4e8a15d79bc3b3a4abf54bd1b
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Thu Jan 7 14:05:54 2021 -0800

    Revert "[BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field"
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 27 ------------
 .../harness/TransformProcessingThreadTracker.java  | 49 ----------------------
 .../fn/harness/logging/BeamFnLoggingClient.java    |  8 ----
 3 files changed, 84 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 23b7699..95ca03e 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -724,8 +724,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void startBundle() {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     // Register as a consumer for each timer.
     timerHandlers = new HashMap<>();
     for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo :
@@ -745,8 +743,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void processElementForParDo(WindowedValue<InputT> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem;
     try {
       doFnInvoker.invokeProcessElement(processContext);
@@ -756,8 +752,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem;
     try {
       Iterator<BoundedWindow> windowIterator =
@@ -773,8 +767,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem;
     try {
       currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
@@ -797,8 +789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem;
     try {
       Iterator<BoundedWindow> windowIterator =
@@ -831,8 +821,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private void processElementForSplitRestriction(
       WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem.withValue(elem.getValue().getKey());
     currentRestriction = elem.getValue().getValue().getKey();
     currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
@@ -861,8 +849,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private void processElementForWindowObservingSplitRestriction(
       WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem.withValue(elem.getValue().getKey());
     currentRestriction = elem.getValue().getValue().getKey();
     currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
@@ -897,8 +883,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private void processElementForTruncateRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
     currentRestriction = elem.getValue().getKey().getValue().getKey();
     currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
@@ -931,8 +915,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private void processElementForWindowObservingTruncateRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
     try {
       windowCurrentIndex = -1;
@@ -1030,8 +1012,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private void processElementForWindowObservingSizedElementAndRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
     try {
       windowCurrentIndex = -1;
@@ -1642,8 +1622,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
   private <K> void processTimer(
       String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     currentTimer = timer;
     currentTimeDomain = timeDomain;
     // The timerIdOrTimerFamilyId contains either a timerId from timer declaration or timerFamilyId
@@ -1671,9 +1649,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void finishBundle() throws Exception {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
-
     for (TimerHandler timerHandler : timerHandlers.values()) {
       timerHandler.awaitCompletion();
     }
@@ -1688,8 +1663,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   private void tearDown() {
-    TransformProcessingThreadTracker.recordProcessingThread(
-        Thread.currentThread().getId(), this.pTransformId);
     doFnInvoker.invokeTeardown();
   }
 
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java
deleted file mode 100644
index ae69db3..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness;
-
-import java.time.Duration;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-
-/**
- * TransformProcessingThreadTracker tracks the thread ids for the transforms that are being
- * processed in the SDK harness.
- */
-public class TransformProcessingThreadTracker {
-  private static final TransformProcessingThreadTracker INSTANCE =
-      new TransformProcessingThreadTracker();
-  private final Cache<Long, String> threadIdToTransformIdMappings;
-
-  private TransformProcessingThreadTracker() {
-    this.threadIdToTransformIdMappings =
-        CacheBuilder.newBuilder().maximumSize(10000).expireAfterAccess(Duration.ofHours(1)).build();
-  }
-
-  public static TransformProcessingThreadTracker getInstance() {
-    return INSTANCE;
-  }
-
-  public static Cache<Long, String> getThreadIdToTransformIdMappings() {
-    return getInstance().threadIdToTransformIdMappings;
-  }
-
-  public static void recordProcessingThread(Long threadId, String transformId) {
-    getInstance().threadIdToTransformIdMappings.put(threadId, transformId);
-  }
-}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index e3c3bd9..0fdf404 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -40,7 +40,6 @@ import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
 import java.util.logging.SimpleFormatter;
-import org.apache.beam.fn.harness.TransformProcessingThreadTracker;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
@@ -217,13 +216,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
         builder.setLogLocation(loggerName);
       }
 
-      String transformId =
-          TransformProcessingThreadTracker.getThreadIdToTransformIdMappings()
-              .getIfPresent((long) record.getThreadID());
-      if (transformId != null) {
-        builder.setTransformId(transformId);
-      }
-
       // The thread that sends log records should never perform a blocking publish and
       // only insert log records best effort.
       if (Thread.currentThread() != logEntryHandlerThread) {