You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/24 00:03:05 UTC

[05/11] incubator-beam git commit: Simplify the API for managing MetricsEnvironment

Simplify the API for managing MetricsEnvironment

1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
   reset the previous container.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fa8f658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fa8f658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fa8f658

Branch: refs/heads/python-sdk
Commit: 6fa8f658abaac1d3a983bfc3b8c09422159af8aa
Parents: 796ba7a
Author: bchambers <bc...@google.com>
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java    | 60 +++++++++++++++-----
 .../sdk/metrics/MetricsEnvironmentTest.java     |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable {
   @Override
   public void run() {
     MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
-    MetricsEnvironment.setMetricsContainer(metricsContainer);
-    try {
+    try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
         ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
 
-      MetricsEnvironment.unsetMetricsContainer();
       transformEvaluationState.complete(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
  * returned objects to create and modify metrics.
  *
  * <p>The runner should create {@link MetricsContainer} for each context in which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that
+ * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore
+ * the previous container.
  *
- * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will restore the previous
+ * container when closed.
  */
 public class MetricsEnvironment {
 
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
   private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
       new ThreadLocal<MetricsContainer>();
 
-  /** Set the {@link MetricsContainer} for the current thread. */
-  public static void setMetricsContainer(MetricsContainer container) {
-    CONTAINER_FOR_THREAD.set(container);
-  }
-
-
-  /** Clear the {@link MetricsContainer} for the current thread. */
-  public static void unsetMetricsContainer() {
-    CONTAINER_FOR_THREAD.remove();
+  /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return The previous container for the current thread.
+   */
+  @Nullable
+  public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) {
+    MetricsContainer previous = getCurrentContainer();
+    if (container == null) {
+      CONTAINER_FOR_THREAD.remove();
+    } else {
+      CONTAINER_FOR_THREAD.set(container);
+    }
+    return previous;
   }
 
   /** Called by the run to indicate whether metrics reporting is supported. */
@@ -62,6 +71,31 @@ public class MetricsEnvironment {
   }
 
   /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return A {@link Closeable} that will reset the current container to the previous
+   * {@link MetricsContainer} when closed.
+   */
+  public static Closeable scopedMetricsContainer(MetricsContainer container) {
+    return new ScopedContainer(container);
+  }
+
+  private static class ScopedContainer implements Closeable {
+
+    @Nullable
+    private final MetricsContainer oldContainer;
+
+    private ScopedContainer(MetricsContainer newContainer) {
+      this.oldContainer = setCurrentContainer(newContainer);
+    }
+
+    @Override
+    public void close() throws IOException {
+      setCurrentContainer(oldContainer);
+    }
+  }
+
+  /**
    * Return the {@link MetricsContainer} for the current thread.
    *
    * <p>May return null if metrics are not supported by the current runner or if the current thread

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 4200a20..0ce17b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
 public class MetricsEnvironmentTest {
   @After
   public void teardown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -44,11 +44,11 @@ public class MetricsEnvironmentTest {
     MetricsContainer c1 = new MetricsContainer("step1");
     MetricsContainer c2 = new MetricsContainer("step2");
 
-    MetricsEnvironment.setMetricsContainer(c1);
+    MetricsEnvironment.setCurrentContainer(c1);
     counter.inc();
-    MetricsEnvironment.setMetricsContainer(c2);
+    MetricsEnvironment.setCurrentContainer(c2);
     counter.dec();
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
 
     MetricUpdates updates1 = c1.getUpdates();
     MetricUpdates updates2 = c2.getUpdates();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index d11b44d..732cb34 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,7 +37,7 @@ public class MetricsTest {
 
   @After
   public void tearDown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -61,7 +61,7 @@ public class MetricsTest {
   @Test
   public void distributionToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
 
     Distribution distribution = Metrics.distribution(NS, NAME);
 
@@ -80,7 +80,7 @@ public class MetricsTest {
   @Test
   public void counterToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
     Counter counter = Metrics.counter(NS, NAME);
     CounterCell cell = container.getCounter(METRIC_NAME);
     counter.inc();