You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/22 20:27:55 UTC

[1/2] incubator-beam git commit: This closes #1417

Repository: incubator-beam
Updated Branches:
  refs/heads/master 6ec45f7e7 -> b41789e9c


This closes #1417


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41789e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41789e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41789e9

Branch: refs/heads/master
Commit: b41789e9c5ac8691243c796968b00a65cc11dd39
Parents: 6ec45f7 e6870a6
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 12:27:41 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 22 12:27:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java    | 60 +++++++++++++++-----
 .../sdk/metrics/MetricsEnvironmentTest.java     |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Simplify the API for managing MetricsEnvironment

Posted by tg...@apache.org.
Simplify the API for managing MetricsEnvironment

1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
   reset the previous container.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6870a6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6870a6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6870a6d

Branch: refs/heads/master
Commit: e6870a6dc10e4ad52a911c316137a9f7731a9194
Parents: 6ec45f7
Author: bchambers <bc...@google.com>
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 22 12:27:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java    | 60 +++++++++++++++-----
 .../sdk/metrics/MetricsEnvironmentTest.java     |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable {
   @Override
   public void run() {
     MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
-    MetricsEnvironment.setMetricsContainer(metricsContainer);
-    try {
+    try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
         ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
 
-      MetricsEnvironment.unsetMetricsContainer();
       transformEvaluationState.complete(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
  * returned objects to create and modify metrics.
  *
  * <p>The runner should create {@link MetricsContainer} for each context in which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that
+ * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore
+ * the previous container.
  *
- * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will restore the previous
+ * container when closed.
  */
 public class MetricsEnvironment {
 
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
   private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
       new ThreadLocal<MetricsContainer>();
 
-  /** Set the {@link MetricsContainer} for the current thread. */
-  public static void setMetricsContainer(MetricsContainer container) {
-    CONTAINER_FOR_THREAD.set(container);
-  }
-
-
-  /** Clear the {@link MetricsContainer} for the current thread. */
-  public static void unsetMetricsContainer() {
-    CONTAINER_FOR_THREAD.remove();
+  /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return The previous container for the current thread.
+   */
+  @Nullable
+  public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) {
+    MetricsContainer previous = getCurrentContainer();
+    if (container == null) {
+      CONTAINER_FOR_THREAD.remove();
+    } else {
+      CONTAINER_FOR_THREAD.set(container);
+    }
+    return previous;
   }
 
   /** Called by the run to indicate whether metrics reporting is supported. */
@@ -62,6 +71,31 @@ public class MetricsEnvironment {
   }
 
   /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return A {@link Closeable} that will reset the current container to the previous
+   * {@link MetricsContainer} when closed.
+   */
+  public static Closeable scopedMetricsContainer(MetricsContainer container) {
+    return new ScopedContainer(container);
+  }
+
+  private static class ScopedContainer implements Closeable {
+
+    @Nullable
+    private final MetricsContainer oldContainer;
+
+    private ScopedContainer(MetricsContainer newContainer) {
+      this.oldContainer = setCurrentContainer(newContainer);
+    }
+
+    @Override
+    public void close() throws IOException {
+      setCurrentContainer(oldContainer);
+    }
+  }
+
+  /**
    * Return the {@link MetricsContainer} for the current thread.
    *
    * <p>May return null if metrics are not supported by the current runner or if the current thread

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 4200a20..0ce17b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
 public class MetricsEnvironmentTest {
   @After
   public void teardown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -44,11 +44,11 @@ public class MetricsEnvironmentTest {
     MetricsContainer c1 = new MetricsContainer("step1");
     MetricsContainer c2 = new MetricsContainer("step2");
 
-    MetricsEnvironment.setMetricsContainer(c1);
+    MetricsEnvironment.setCurrentContainer(c1);
     counter.inc();
-    MetricsEnvironment.setMetricsContainer(c2);
+    MetricsEnvironment.setCurrentContainer(c2);
     counter.dec();
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
 
     MetricUpdates updates1 = c1.getUpdates();
     MetricUpdates updates2 = c2.getUpdates();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index d11b44d..732cb34 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,7 +37,7 @@ public class MetricsTest {
 
   @After
   public void tearDown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -61,7 +61,7 @@ public class MetricsTest {
   @Test
   public void distributionToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
 
     Distribution distribution = Metrics.distribution(NS, NAME);
 
@@ -80,7 +80,7 @@ public class MetricsTest {
   @Test
   public void counterToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
     Counter counter = Metrics.counter(NS, NAME);
     CounterCell cell = container.getCounter(METRIC_NAME);
     counter.inc();