You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/22 20:27:55 UTC
[1/2] incubator-beam git commit: This closes #1417
Repository: incubator-beam
Updated Branches:
refs/heads/master 6ec45f7e7 -> b41789e9c
This closes #1417
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41789e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41789e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41789e9
Branch: refs/heads/master
Commit: b41789e9c5ac8691243c796968b00a65cc11dd39
Parents: 6ec45f7 e6870a6
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 12:27:41 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 22 12:27:41 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/TransformExecutor.java | 5 +-
.../beam/sdk/metrics/MetricsEnvironment.java | 60 +++++++++++++++-----
.../sdk/metrics/MetricsEnvironmentTest.java | 8 +--
.../apache/beam/sdk/metrics/MetricsTest.java | 6 +-
4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Simplify the API for managing
MetricsEnvironment
Posted by tg...@apache.org.
Simplify the API for managing MetricsEnvironment
1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
reset the previous container.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6870a6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6870a6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6870a6d
Branch: refs/heads/master
Commit: e6870a6dc10e4ad52a911c316137a9f7731a9194
Parents: 6ec45f7
Author: bchambers <bc...@google.com>
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 22 12:27:41 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/TransformExecutor.java | 5 +-
.../beam/sdk/metrics/MetricsEnvironment.java | 60 +++++++++++++++-----
.../sdk/metrics/MetricsEnvironmentTest.java | 8 +--
.../apache/beam/sdk/metrics/MetricsTest.java | 6 +-
4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable {
@Override
public void run() {
MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
- MetricsEnvironment.setMetricsContainer(metricsContainer);
- try {
+ try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable {
// Report the physical metrics from the end of this step.
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
- MetricsEnvironment.unsetMetricsContainer();
transformEvaluationState.complete(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.metrics;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
* returned objects to create and modify metrics.
*
* <p>The runner should create {@link MetricsContainer} for each context in which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that
+ * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore
+ * the previous container.
*
- * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will restore the previous
+ * container when closed.
*/
public class MetricsEnvironment {
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
new ThreadLocal<MetricsContainer>();
- /** Set the {@link MetricsContainer} for the current thread. */
- public static void setMetricsContainer(MetricsContainer container) {
- CONTAINER_FOR_THREAD.set(container);
- }
-
-
- /** Clear the {@link MetricsContainer} for the current thread. */
- public static void unsetMetricsContainer() {
- CONTAINER_FOR_THREAD.remove();
+ /**
+ * Set the {@link MetricsContainer} for the current thread.
+ *
+ * @return The previous container for the current thread.
+ */
+ @Nullable
+ public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) {
+ MetricsContainer previous = getCurrentContainer();
+ if (container == null) {
+ CONTAINER_FOR_THREAD.remove();
+ } else {
+ CONTAINER_FOR_THREAD.set(container);
+ }
+ return previous;
}
/** Called by the run to indicate whether metrics reporting is supported. */
@@ -62,6 +71,31 @@ public class MetricsEnvironment {
}
/**
+ * Set the {@link MetricsContainer} for the current thread.
+ *
+ * @return A {@link Closeable} that will reset the current container to the previous
+ * {@link MetricsContainer} when closed.
+ */
+ public static Closeable scopedMetricsContainer(MetricsContainer container) {
+ return new ScopedContainer(container);
+ }
+
+ private static class ScopedContainer implements Closeable {
+
+ @Nullable
+ private final MetricsContainer oldContainer;
+
+ private ScopedContainer(MetricsContainer newContainer) {
+ this.oldContainer = setCurrentContainer(newContainer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ setCurrentContainer(oldContainer);
+ }
+ }
+
+ /**
* Return the {@link MetricsContainer} for the current thread.
*
* <p>May return null if metrics are not supported by the current runner or if the current thread
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 4200a20..0ce17b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
public class MetricsEnvironmentTest {
@After
public void teardown() {
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
}
@Test
@@ -44,11 +44,11 @@ public class MetricsEnvironmentTest {
MetricsContainer c1 = new MetricsContainer("step1");
MetricsContainer c2 = new MetricsContainer("step2");
- MetricsEnvironment.setMetricsContainer(c1);
+ MetricsEnvironment.setCurrentContainer(c1);
counter.inc();
- MetricsEnvironment.setMetricsContainer(c2);
+ MetricsEnvironment.setCurrentContainer(c2);
counter.dec();
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
MetricUpdates updates1 = c1.getUpdates();
MetricUpdates updates2 = c2.getUpdates();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index d11b44d..732cb34 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,7 +37,7 @@ public class MetricsTest {
@After
public void tearDown() {
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
}
@Test
@@ -61,7 +61,7 @@ public class MetricsTest {
@Test
public void distributionToCell() {
MetricsContainer container = new MetricsContainer("step");
- MetricsEnvironment.setMetricsContainer(container);
+ MetricsEnvironment.setCurrentContainer(container);
Distribution distribution = Metrics.distribution(NS, NAME);
@@ -80,7 +80,7 @@ public class MetricsTest {
@Test
public void counterToCell() {
MetricsContainer container = new MetricsContainer("step");
- MetricsEnvironment.setMetricsContainer(container);
+ MetricsEnvironment.setCurrentContainer(container);
Counter counter = Metrics.counter(NS, NAME);
CounterCell cell = container.getCounter(METRIC_NAME);
counter.inc();