You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/11 00:52:53 UTC
[1/2] beam git commit: Adding protobuf matchers for dataflow client.
Repository: beam
Updated Branches:
refs/heads/python-sdk a25515171 -> cb0634984
Adding protobuf matchers for dataflow client.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37558fa9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37558fa9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37558fa9
Branch: refs/heads/python-sdk
Commit: 37558fa9f3b57f340e6eb3bb630f53e0036c44f5
Parents: a255151
Author: Pablo <pa...@google.com>
Authored: Tue Jan 10 13:38:48 2017 -0800
Committer: Pablo <pa...@google.com>
Committed: Tue Jan 10 14:21:33 2017 -0800
----------------------------------------------------------------------
.../clients/dataflow/message_matchers.py | 124 +++++++++++++++++++
.../clients/dataflow/message_matchers_test.py | 69 +++++++++++
2 files changed, 193 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/37558fa9/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
new file mode 100644
index 0000000..4dda47a
--- /dev/null
+++ b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+
+IGNORED = object()
+
+
+class MetricStructuredNameMatcher(BaseMatcher):
+ """Matches a MetricStructuredName."""
+ def __init__(self,
+ name=IGNORED,
+ origin=IGNORED,
+ context=IGNORED):
+ """Creates a MetricsStructuredNameMatcher.
+
+ Any property not passed in to the constructor will be ignored when matching.
+
+ Args:
+ name: A string with the metric name.
+ origin: A string with the metric namespace.
+ context: A key:value dictionary that will be matched to the
+ structured name.
+ """
+ if context != IGNORED and not isinstance(context, dict):
+ raise ValueError('context must be a Python dictionary.')
+
+ self.name = name
+ self.origin = origin
+ self.context = context
+
+ def _matches(self, item):
+ if self.name != IGNORED and item.name != self.name:
+ return False
+ if self.origin != IGNORED and item.origin != self.origin:
+ return False
+ if self.context != IGNORED:
+ for key, name in self.context.iteritems():
+ if key not in item.context:
+ return False
+ if name != IGNORED and item.context[key] != name:
+ return False
+ return True
+
+ def describe_to(self, description):
+ descriptors = []
+ if self.name != IGNORED:
+ descriptors.append('name is {}'.format(self.name))
+ if self.origin != IGNORED:
+ descriptors.append('origin is {}'.format(self.origin))
+ if self.context != IGNORED:
+ descriptors.append('context is ({})'.format(str(self.context)))
+
+ item_description = ' and '.join(descriptors)
+ description.append(item_description)
+
+
+class MetricUpdateMatcher(BaseMatcher):
+ """Matches a metrics update protocol buffer."""
+ def __init__(self,
+ cumulative=IGNORED,
+ name=IGNORED,
+ scalar=IGNORED,
+ kind=IGNORED):
+ """Creates a MetricUpdateMatcher.
+
+ Any property not passed in to the constructor will be ignored when matching.
+
+ Args:
+ cumulative: A boolean.
+ name: A MetricStructuredNameMatcher object that matches the name.
+ scalar: An integer with the metric update.
+ kind: A string defining the kind of counter.
+ """
+ if name != IGNORED and not isinstance(name, MetricStructuredNameMatcher):
+ raise ValueError('name must be a MetricStructuredNameMatcher.')
+
+ self.cumulative = cumulative
+ self.name = name
+ self.scalar = scalar
+ self.kind = kind
+
+ def _matches(self, item):
+ if self.cumulative != IGNORED and item.cumulative != self.cumulative:
+ return False
+ if self.name != IGNORED and not self.name._matches(item.name):
+ return False
+ if self.kind != IGNORED and item.kind != self.kind:
+ return False
+ if self.scalar != IGNORED:
+ value_property = [p
+ for p in item.scalar.object_value.properties
+ if p.key == 'value']
+ int_value = value_property[0].value.integer_value
+ if self.scalar != int_value:
+ return False
+ return True
+
+ def describe_to(self, description):
+ descriptors = []
+ if self.cumulative != IGNORED:
+ descriptors.append('cumulative is {}'.format(self.cumulative))
+ if self.name != IGNORED:
+ descriptors.append('name is {}'.format(self.name))
+ if self.scalar != IGNORED:
+ descriptors.append('scalar is ({})'.format(str(self.scalar)))
+
+ item_description = ' and '.join(descriptors)
+ description.append(item_description)
http://git-wip-us.apache.org/repos/asf/beam/blob/37558fa9/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
new file mode 100644
index 0000000..ec63ce7
--- /dev/null
+++ b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+import hamcrest as hc
+
+import apache_beam.internal.clients.dataflow as dataflow
+from apache_beam.internal.clients.dataflow import message_matchers
+from apache_beam.internal.json_value import to_json_value
+
+
+class TestMatchers(unittest.TestCase):
+
+ def test_structured_name_matcher_basic(self):
+ metric_name = dataflow.MetricStructuredName()
+ metric_name.name = 'metric1'
+ metric_name.origin = 'origin2'
+
+ matcher = message_matchers.MetricStructuredNameMatcher(
+ name='metric1',
+ origin='origin2')
+ hc.assert_that(metric_name, hc.is_(matcher))
+ with self.assertRaises(AssertionError):
+ matcher = message_matchers.MetricStructuredNameMatcher(
+ name='metric1',
+ origin='origin1')
+ hc.assert_that(metric_name, hc.is_(matcher))
+
+ def test_metric_update_basic(self):
+ metric_update = dataflow.MetricUpdate()
+ metric_update.name = dataflow.MetricStructuredName()
+ metric_update.name.name = 'metric1'
+ metric_update.name.origin = 'origin1'
+
+ metric_update.cumulative = False
+ metric_update.kind = 'sum'
+ metric_update.scalar = to_json_value(1, with_type=True)
+
+ name_matcher = message_matchers.MetricStructuredNameMatcher(
+ name='metric1',
+ origin='origin1')
+ matcher = message_matchers.MetricUpdateMatcher(
+ name=name_matcher,
+ kind='sum',
+ scalar=1)
+
+ hc.assert_that(metric_update, hc.is_(matcher))
+
+ with self.assertRaises(AssertionError):
+ matcher.kind = 'suma'
+ hc.assert_that(metric_update, hc.is_(matcher))
+
+
+if __name__ == '__main__':
+ unittest.main()
[2/2] beam git commit: Closes #1761
Posted by ro...@apache.org.
Closes #1761
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cb063498
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cb063498
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cb063498
Branch: refs/heads/python-sdk
Commit: cb06349846348641c1cce14310c13974dc9fcc0b
Parents: a255151 37558fa
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 10 16:52:28 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 10 16:52:28 2017 -0800
----------------------------------------------------------------------
.../clients/dataflow/message_matchers.py | 124 +++++++++++++++++++
.../clients/dataflow/message_matchers_test.py | 69 +++++++++++
2 files changed, 193 insertions(+)
----------------------------------------------------------------------