You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/08 17:13:12 UTC

[GitHub] [beam] steveniemitz commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

steveniemitz commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917003712


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().

Review Comment:
   nit: I think this is supposed to be `create` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org