You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/29 17:43:18 UTC

[GitHub] [beam] lukecwik opened a new pull request, #22103: [WIP, BEAM-13015, #22050] Swap msec counters to use faster implementation

lukecwik opened a new pull request, #22103:
URL: https://github.com/apache/beam/pull/22103

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917003712


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().

Review Comment:
   nit: I think this is supposed to be `create` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917033253


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().
+    private final List<ExecutionStateImpl> executionStates;
+    // Read by multiple threads, written by the bundle processing thread lazily.
+    private final AtomicReference<@Nullable String> processBundleId;

Review Comment:
   That is exactly right.
   
   It would produce the exact value if the thread that reported the value was also the thread that wrote the value like I did in https://github.com/apache/beam/pull/22002 for PCollection counters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917021655


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().
+    private final List<ExecutionStateImpl> executionStates;
+    // Read by multiple threads, written by the bundle processing thread lazily.
+    private final AtomicReference<@Nullable String> processBundleId;

Review Comment:
   just to make sure I'm understanding this, the semantics of lazySet is basically a store-store barrier, so other threads may not observe the `set` for some amount of time.  We're ok with this since the sampling doesn't need to be exact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178181122

   The overhead that I see isn't the sampler but the cost of producing an output and sending it to the next fused transform.
   
   For trivial DoFns the overhead can be as high as 50% of the CPU time for processing an element.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917045394


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().

Review Comment:
   ```suggestion
       // The set of execution states that this tracker is responsible for. Effectively
       // final since create() should not be invoked once any bundle starts processing.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178114663

   Ah this is cool and was next up on my hit list, although for a different reason.  I've never seen the actual sampler code itself show up as a significant CPU/memory offender, however, the contention around add/removeTracker is significant since all worker threads need to serialize through it.
   
   There's a couple options I've tried to remove the lock there:
   - Switch `activeTrackers` back to a concurrent set, the problem here was that deactivating a tracker could still race with the sampler thread and double-sample a sampler.  I'm not sure if this is a big enough deal to worry about?
   - Rather than worry about synchronization at all, push all add/remove/sample operations into a queue and have a single thread consuming from it and maintaining activeTrackers and doing the sampling.  Doing so removes the race above and simplifies the logic a lot (I think).  I used the LMAX Disruptor queue to handle the multi-consumer-single-producer setup here, but you could probably use a built-in java concurrent blocking queue for this as well.
   
   In any case, I was going to put up a review for the other version of this today-ish, we can apply whatever we end up deciding on to this as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178186588

   > The overhead that I see isn't the sampler but the cost of producing an output and sending it to the next fused transform.
   > 
   > 
   > 
   > For trivial DoFns the overhead can be as high as 50% of the CPU time for processing an element.
   
   Oh yeah totally, I think this is great stuff, sorry I didn't mean to imply otherwise.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917045732


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178552338

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1177084797

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r917033253


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Monitors the execution of one or more execution threads. */
+public class ExecutionStateSampler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateSampler.class);
+  private static final int DEFAULT_SAMPLING_PERIOD_MS = 200;
+  private static final long MAX_LULL_TIME_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final PeriodFormatter DURATION_FORMATTER =
+      new PeriodFormatterBuilder()
+          .appendDays()
+          .appendSuffix("d")
+          .minimumPrintedDigits(2)
+          .appendHours()
+          .appendSuffix("h")
+          .printZeroAlways()
+          .appendMinutes()
+          .appendSuffix("m")
+          .appendSeconds()
+          .appendSuffix("s")
+          .toFormatter();
+  private final int periodMs;
+  private final MillisProvider clock;
+
+  @GuardedBy("activeStateTrackers")
+  private final Set<ExecutionStateTracker> activeStateTrackers;
+
+  private final Future<Void> stateSamplingThread;
+
+  @SuppressWarnings(
+      "methodref.receiver.bound.invalid" /* Synchronization ensures proper initialization */)
+  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+    String samplingPeriodMills =
+        ExperimentalOptions.getExperimentValue(
+            options, ExperimentalOptions.STATE_SAMPLING_PERIOD_MILLIS);
+    this.periodMs =
+        samplingPeriodMills == null
+            ? DEFAULT_SAMPLING_PERIOD_MS
+            : Integer.parseInt(samplingPeriodMills);
+    this.clock = clock;
+    this.activeStateTrackers = new HashSet<>();
+    // We specifically synchronize to ensure that this object can complete
+    // being published before the state sampler thread starts.
+    synchronized (this) {
+      this.stateSamplingThread =
+          options.as(GcsOptions.class).getExecutorService().submit(this::stateSampler);
+    }
+  }
+
+  /** An {@link ExecutionState} represents the current state of an execution thread. */
+  public interface ExecutionState {
+
+    /**
+     * Activates this execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void activate();
+
+    /**
+     * Restores the previously active execution state within the {@link ExecutionStateTracker}.
+     *
+     * <p>Must only be invoked by the bundle processing thread.
+     */
+    void deactivate();
+  }
+
+  /** Stops the execution of the state sampler. */
+  public void stop() {
+    stateSamplingThread.cancel(true);
+    try {
+      stateSamplingThread.get(5L * periodMs, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      // This was expected -- we were cancelling the thread.
+    } catch (InterruptedException | TimeoutException e) {
+      throw new RuntimeException(
+          "Failed to stop state sampling after waiting 5 sampling periods.", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Exception in state sampler", e);
+    }
+  }
+
+  /** Entry point for the state sampling thread. */
+  private Void stateSampler() throws Exception {
+    // Ensure the object finishes being published safely.
+    synchronized (this) {
+      if (stateSamplingThread == null) {
+        throw new IllegalStateException("Underinitialized ExecutionStateSampler instance");
+      }
+    }
+
+    long lastSampleTimeMillis = clock.getMillis();
+    long targetTimeMillis = lastSampleTimeMillis + periodMs;
+    while (!Thread.interrupted()) {
+      long currentTimeMillis = clock.getMillis();
+      long difference = targetTimeMillis - currentTimeMillis;
+      if (difference > 0) {
+        Thread.sleep(difference);
+      } else {
+        long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
+        synchronized (activeStateTrackers) {
+          for (ExecutionStateTracker activeTracker : activeStateTrackers) {
+            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+          }
+        }
+        lastSampleTimeMillis = currentTimeMillis;
+        targetTimeMillis = lastSampleTimeMillis + periodMs;
+      }
+    }
+    return null;
+  }
+
+  /** Returns a new {@link ExecutionStateTracker} associated with this state sampler. */
+  public ExecutionStateTracker create() {
+    return new ExecutionStateTracker();
+  }
+
+  /** Tracks the current state of a single execution thread. */
+  public class ExecutionStateTracker implements BundleProgressReporter {
+
+    // The set of execution states that this tracker is responsible for. Effectively
+    // final after the first execution of start().
+    private final List<ExecutionStateImpl> executionStates;
+    // Read by multiple threads, written by the bundle processing thread lazily.
+    private final AtomicReference<@Nullable String> processBundleId;

Review Comment:
   That is exactly right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22103:
URL: https://github.com/apache/beam/pull/22103#discussion_r916117748


##########
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java:
##########
@@ -858,7 +857,7 @@ public MetricsDoFn() {
     public void startBundle() throws InterruptedException {
       Metrics.counter(RemoteExecutionTest.class, START_USER_COUNTER_NAME).inc(10);
       Metrics.distribution(RemoteExecutionTest.class, START_USER_DISTRIBUTION_NAME).update(10);
-      ExecutionStateSampler.instance().doSampling(1);

Review Comment:
   I purposely moved away from forcing the sampling to happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [WIP, BEAM-13015, #22050] Swap msec counters to use faster implementation

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1170357995

   Tiny bundle:
   ![large bundle](https://user-images.githubusercontent.com/10078956/176501848-df2c04b1-7d78-4d3e-95b6-2fd3e08754b4.png)
   
   Large bundle:
   ![tiny bundle](https://user-images.githubusercontent.com/10078956/176501856-1ee0a341-a44c-4ff4-b09a-51fe2d43acd8.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1177966876

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178181535

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1177967439

   R: @steveniemitz 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22103:
URL: https://github.com/apache/beam/pull/22103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22103: [BEAM-13015, #22050] Make SDK harness msec counters faster using ordered puts

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22103:
URL: https://github.com/apache/beam/pull/22103#issuecomment-1178550970

   > > The overhead that I see isn't the sampler but the cost of producing an output and sending it to the next fused transform.
   > > For trivial DoFns the overhead can be as high as 50% of the CPU time for processing an element.
   > 
   > Oh yeah totally, I think this is great stuff, sorry I didn't mean to imply otherwise.
   
   Never thought you did, was giving context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org