You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/03/01 23:20:02 UTC
[1/2] beam git commit: Closes #2122
Repository: beam
Updated Branches:
refs/heads/master afd710674 -> bc1a301b0
Closes #2122
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc1a301b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc1a301b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc1a301b
Branch: refs/heads/master
Commit: bc1a301b0a38f354658639c39289903c4e260564
Parents: afd7106 b34e50c
Author: bchambers <bc...@google.com>
Authored: Wed Mar 1 14:59:22 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 1 14:59:22 2017 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 31 ++++++++-
.../beam/runners/direct/DirectMetricsTest.java | 70 ++++++++++++++++++++
sdks/python/apache_beam/metrics/metric.py | 53 ++++++++++++---
sdks/python/apache_beam/metrics/metric_test.py | 43 ++++++++++++
4 files changed, 187 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Adding per-stage matching to metrics filters
Posted by bc...@apache.org.
Adding per-stage matching to metrics filters
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b34e50c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b34e50c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b34e50c6
Branch: refs/heads/master
Commit: b34e50c649adc8861670c42228cb96688abf4038
Parents: afd7106
Author: Pablo <pa...@google.com>
Authored: Mon Feb 27 17:28:26 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 1 14:59:22 2017 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 31 ++++++++-
.../beam/runners/direct/DirectMetricsTest.java | 70 ++++++++++++++++++++
sdks/python/apache_beam/metrics/metric.py | 53 ++++++++++++---
sdks/python/apache_beam/metrics/metric_test.py | 43 ++++++++++++
4 files changed, 187 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 145326f..fa8f9c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -275,13 +275,40 @@ class DirectMetrics extends MetricResults {
&& matchesScope(key.stepName(), filter.steps());
}
- private boolean matchesScope(String actualScope, Set<String> scopes) {
+ /**
+ * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+ * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
+ * but not "a/fool/bar/b" or "a/foo/bart/b".
+ */
+ public boolean subPathMatches(String haystack, String needle) {
+ int location = haystack.indexOf(needle);
+ int end = location + needle.length();
+ if (location == -1) {
+ return false; // needle not found
+ } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+ return false; // the first entry in needle wasn't exactly matched
+ } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+ return false; // the last entry in needle wasn't exactly matched
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
+ * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
+ * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
+ * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
+ public boolean matchesScope(String actualScope, Set<String> scopes) {
if (scopes.isEmpty() || scopes.contains(actualScope)) {
return true;
}
+ // If there is no perfect match, a stage name-level match is tried.
+ // This is done by a substring search over the levels of the scope.
+ // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
for (String scope : scopes) {
- if (actualScope.startsWith(scope)) {
+ if (subPathMatches(actualScope, scope)) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 3ad2bdc..77229bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -23,9 +23,13 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
@@ -125,6 +129,72 @@ public class DirectMetricsTest {
committedMetricsResult("ns1", "name1", "step2", 0L)));
}
+ private boolean matchesSubPath(String actualScope, String subPath) {
+ return metrics.subPathMatches(actualScope, subPath);
+ }
+
+ @Test
+ public void testMatchesSubPath() {
+ assertTrue("Match of the first element",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+ assertTrue("Match of the first elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+ assertTrue("Match of the last elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+ assertFalse("Substring match but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+ assertFalse("Substring match from start - but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+ }
+
+ private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
+ Set<String> scopeFilter = new HashSet<String>();
+ scopeFilter.add(filter);
+ return metrics.matchesScope(actualScope, scopeFilter);
+ }
+
+ @Test
+ public void testMatchesScope() {
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
+ assertTrue(matchesScopeWithSingleFilter(
+ "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPartialScopeMatchingInMetricsQuery() {
+ metrics.updatePhysical(bundle1, MetricUpdates.create(
+ ImmutableList.of(
+ MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
+ MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
+ ImmutableList.<MetricUpdate<DistributionData>>of()));
+ metrics.updatePhysical(bundle1, MetricUpdates.create(
+ ImmutableList.of(
+ MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
+ MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
+ ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+ MetricQueryResults results = metrics.queryMetrics(
+ MetricsFilter.builder().addStep("Top1/Outer1").build());
+
+ assertThat(results.counters(),
+ containsInAnyOrder(
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
+
+ results = metrics.queryMetrics(
+ MetricsFilter.builder().addStep("Inner2").build());
+
+ assertThat(results.counters(),
+ containsInAnyOrder(
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testApplyAttemptedQueryCompositeScope() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index a0e3cba..f6a0923 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -32,8 +32,7 @@ from apache_beam.metrics.metricbase import MetricName
class Metrics(object):
- """Lets users create/access metric objects during pipeline execution.
- """
+ """Lets users create/access metric objects during pipeline execution."""
@staticmethod
def get_namespace(namespace):
if inspect.isclass(namespace):
@@ -93,14 +92,52 @@ class Metrics(object):
class MetricResults(object):
+
+ @staticmethod
+ def _matches_name(filter, metric_key):
+ if not filter.names and not filter.namespaces:
+ return True
+
+ if ((filter.namespaces and
+ metric_key.metric.namespace in filter.namespaces) or
+ (filter.names and
+ metric_key.metric.name in filter.names)):
+ return True
+ else:
+ return False
+
+ @staticmethod
+ def _matches_sub_path(actual_scope, filter_scope):
+ start_pos = actual_scope.find(filter_scope)
+ end_pos = start_pos + len(filter_scope)
+
+ if start_pos == -1:
+ return False # No match at all
+ elif start_pos != 0 and actual_scope[start_pos - 1] != '/':
+ return False # The first entry was not exactly matched
+ elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
+ return False # The last entry was not exactly matched
+ else:
+ return True
+
+ @staticmethod
+ def _matches_scope(filter, metric_key):
+ if not filter.steps:
+ return True
+
+ for step in filter.steps:
+ if MetricResults._matches_sub_path(metric_key.step, step):
+ return True
+
+ return False
+
@staticmethod
def matches(filter, metric_key):
if filter is None:
return True
- if (metric_key.step in filter.steps and
- metric_key.metric.namespace in filter.namespaces and
- metric_key.metric.name in filter.names):
+ if (MetricResults._matches_name(filter, metric_key) and
+ MetricResults._matches_scope(filter, metric_key)):
return True
else:
return False
@@ -139,9 +176,9 @@ class MetricsFilter(object):
def with_names(self, names):
if isinstance(names, str):
- raise ValueError('Names must be an iterable, not a string')
+ raise ValueError('Names must be a collection, not a string')
- self._steps.update(names)
+ self._names.update(names)
return self
def with_namespace(self, namespace):
@@ -158,7 +195,7 @@ class MetricsFilter(object):
return self.with_steps([step])
def with_steps(self, steps):
- if isinstance(namespaces, str):
+ if isinstance(steps, str):
raise ValueError('Steps must be an iterable, not a string')
self._steps.update(steps)
http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py
index 4860edf..56b7468 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -22,6 +22,8 @@ from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metricbase import MetricName
@@ -39,6 +41,47 @@ class NameTest(unittest.TestCase):
self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1')))
+class MetricResultsTest(unittest.TestCase):
+
+ def test_metric_filter_namespace_matching(self):
+ filter = MetricsFilter().with_namespace('ns1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('step1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ def test_metric_filter_name_matching(self):
+ filter = MetricsFilter().with_name('name1').with_namespace('ns1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('step1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ filter = MetricsFilter().with_name('name1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('step1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ def test_metric_filter_step_matching(self):
+ filter = MetricsFilter().with_step('Top1/Outer1/Inner1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('Top1/Outer1/Inner1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ filter = MetricsFilter().with_step('step1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('step1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ filter = MetricsFilter().with_step('Top1/Outer1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('Top1/Outer1/Inner1', name)
+ self.assertTrue(MetricResults.matches(filter, key))
+
+ filter = MetricsFilter().with_step('Top1/Inner1')
+ name = MetricName('ns1', 'name1')
+ key = MetricKey('Top1/Outer1/Inner1', name)
+ self.assertFalse(MetricResults.matches(filter, key))
+
+
class MetricsTest(unittest.TestCase):
def test_get_namespace_class(self):
class MyClass(object):