You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/03/01 23:20:02 UTC

[1/2] beam git commit: Closes #2122

Repository: beam
Updated Branches:
  refs/heads/master afd710674 -> bc1a301b0


Closes #2122


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc1a301b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc1a301b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc1a301b

Branch: refs/heads/master
Commit: bc1a301b0a38f354658639c39289903c4e260564
Parents: afd7106 b34e50c
Author: bchambers <bc...@google.com>
Authored: Wed Mar 1 14:59:22 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 1 14:59:22 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 31 ++++++++-
 .../beam/runners/direct/DirectMetricsTest.java  | 70 ++++++++++++++++++++
 sdks/python/apache_beam/metrics/metric.py       | 53 ++++++++++++---
 sdks/python/apache_beam/metrics/metric_test.py  | 43 ++++++++++++
 4 files changed, 187 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Adding per-stage matching to metrics filters

Posted by bc...@apache.org.
Adding per-stage matching to metrics filters


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b34e50c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b34e50c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b34e50c6

Branch: refs/heads/master
Commit: b34e50c649adc8861670c42228cb96688abf4038
Parents: afd7106
Author: Pablo <pa...@google.com>
Authored: Mon Feb 27 17:28:26 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 1 14:59:22 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 31 ++++++++-
 .../beam/runners/direct/DirectMetricsTest.java  | 70 ++++++++++++++++++++
 sdks/python/apache_beam/metrics/metric.py       | 53 ++++++++++++---
 sdks/python/apache_beam/metrics/metric_test.py  | 43 ++++++++++++
 4 files changed, 187 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 145326f..fa8f9c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -275,13 +275,40 @@ class DirectMetrics extends MetricResults {
         && matchesScope(key.stepName(), filter.steps());
   }
 
-  private boolean matchesScope(String actualScope, Set<String> scopes) {
+  /**
+  * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+  * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
+  * but not "a/fool/bar/b" or "a/foo/bart/b".
+  */
+  public boolean subPathMatches(String haystack, String needle) {
+    int location = haystack.indexOf(needle);
+    int end = location + needle.length();
+    if (location == -1) {
+      return false;  // needle not found
+    } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+      return false; // the first entry in needle wasn't exactly matched
+    } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+      return false; // the last entry in needle wasn't exactly matched
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
+   * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
+   * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
+   * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
+  public boolean matchesScope(String actualScope, Set<String> scopes) {
     if (scopes.isEmpty() || scopes.contains(actualScope)) {
       return true;
     }
 
+    // If there is no perfect match, a stage name-level match is tried.
+    // This is done by a substring search over the levels of the scope.
+    // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
     for (String scope : scopes) {
-      if (actualScope.startsWith(scope)) {
+      if (subPathMatches(actualScope, scope)) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 3ad2bdc..77229bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -23,9 +23,13 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
 import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -125,6 +129,72 @@ public class DirectMetricsTest {
             committedMetricsResult("ns1", "name1", "step2", 0L)));
   }
 
+  private boolean matchesSubPath(String actualScope, String subPath) {
+    return metrics.subPathMatches(actualScope, subPath);
+  }
+
+  @Test
+  public void testMatchesSubPath() {
+    assertTrue("Match of the first element",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue("Match of the first elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue("Match of the last elements",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+    assertFalse("Substring match but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+    assertFalse("Substring match from start - but no subpath match",
+        matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+  }
+
+  private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
+    Set<String> scopeFilter = new HashSet<String>();
+    scopeFilter.add(filter);
+    return metrics.matchesScope(actualScope, scopeFilter);
+  }
+
+  @Test
+  public void testMatchesScope() {
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
+    assertTrue(matchesScopeWithSingleFilter(
+        "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+    assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
+    assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPartialScopeMatchingInMetricsQuery() {
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
+            MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+    MetricQueryResults results = metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Top1/Outer1").build());
+
+    assertThat(results.counters(),
+        containsInAnyOrder(
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
+
+    results = metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Inner2").build());
+
+    assertThat(results.counters(),
+        containsInAnyOrder(
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
+            attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testApplyAttemptedQueryCompositeScope() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index a0e3cba..f6a0923 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -32,8 +32,7 @@ from apache_beam.metrics.metricbase import MetricName
 
 
 class Metrics(object):
-  """Lets users create/access metric objects during pipeline execution.
-  """
+  """Lets users create/access metric objects during pipeline execution."""
   @staticmethod
   def get_namespace(namespace):
     if inspect.isclass(namespace):
@@ -93,14 +92,52 @@ class Metrics(object):
 
 
 class MetricResults(object):
+
+  @staticmethod
+  def _matches_name(filter, metric_key):
+    if not filter.names and not filter.namespaces:
+      return True
+
+    if ((filter.namespaces and
+         metric_key.metric.namespace in filter.namespaces) or
+        (filter.names and
+         metric_key.metric.name in filter.names)):
+      return True
+    else:
+      return False
+
+  @staticmethod
+  def _matches_sub_path(actual_scope, filter_scope):
+    start_pos = actual_scope.find(filter_scope)
+    end_pos = start_pos + len(filter_scope)
+
+    if start_pos == -1:
+      return False  # No match at all
+    elif start_pos != 0 and actual_scope[start_pos - 1] != '/':
+      return False  # The first entry was not exactly matched
+    elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
+      return False  # The last entry was not exactly matched
+    else:
+      return True
+
+  @staticmethod
+  def _matches_scope(filter, metric_key):
+    if not filter.steps:
+      return True
+
+    for step in filter.steps:
+      if MetricResults._matches_sub_path(metric_key.step, step):
+        return True
+
+    return False
+
   @staticmethod
   def matches(filter, metric_key):
     if filter is None:
       return True
 
-    if (metric_key.step in filter.steps and
-        metric_key.metric.namespace in filter.namespaces and
-        metric_key.metric.name in filter.names):
+    if (MetricResults._matches_name(filter, metric_key) and
+        MetricResults._matches_scope(filter, metric_key)):
       return True
     else:
       return False
@@ -139,9 +176,9 @@ class MetricsFilter(object):
 
   def with_names(self, names):
     if isinstance(names, str):
-      raise ValueError('Names must be an iterable, not a string')
+      raise ValueError('Names must be a collection, not a string')
 
-    self._steps.update(names)
+    self._names.update(names)
     return self
 
   def with_namespace(self, namespace):
@@ -158,7 +195,7 @@ class MetricsFilter(object):
     return self.with_steps([step])
 
   def with_steps(self, steps):
-    if isinstance(namespaces, str):
+    if isinstance(steps, str):
       raise ValueError('Steps must be an iterable, not a string')
 
     self._steps.update(steps)

http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py
index 4860edf..56b7468 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -22,6 +22,8 @@ from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metricbase import MetricName
 
 
@@ -39,6 +41,47 @@ class NameTest(unittest.TestCase):
     self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1')))
 
 
+class MetricResultsTest(unittest.TestCase):
+
+  def test_metric_filter_namespace_matching(self):
+    filter = MetricsFilter().with_namespace('ns1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('step1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+  def test_metric_filter_name_matching(self):
+    filter = MetricsFilter().with_name('name1').with_namespace('ns1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('step1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+    filter = MetricsFilter().with_name('name1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('step1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+  def test_metric_filter_step_matching(self):
+    filter = MetricsFilter().with_step('Top1/Outer1/Inner1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('Top1/Outer1/Inner1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+    filter = MetricsFilter().with_step('step1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('step1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+    filter = MetricsFilter().with_step('Top1/Outer1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('Top1/Outer1/Inner1', name)
+    self.assertTrue(MetricResults.matches(filter, key))
+
+    filter = MetricsFilter().with_step('Top1/Inner1')
+    name = MetricName('ns1', 'name1')
+    key = MetricKey('Top1/Outer1/Inner1', name)
+    self.assertFalse(MetricResults.matches(filter, key))
+
+
 class MetricsTest(unittest.TestCase):
   def test_get_namespace_class(self):
     class MyClass(object):