You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/26 19:14:02 UTC
[beam] branch master updated: implement query 12
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 39e74c4 implement query 12
new 5d8aa2c Merge pull request #12675 from [BEAM-2855] implement query 12
39e74c4 is described below
commit 39e74c471e33eb00b591e196b29f1f4fb9d30ab0
Author: Leiyi Zhang <le...@google.com>
AuthorDate: Mon Aug 24 19:45:50 2020 +0000
implement query 12
---
.../testing/benchmarks/nexmark/nexmark_launcher.py | 6 ++-
.../testing/benchmarks/nexmark/queries/query12.py | 51 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
index 195797e..51526dd 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
@@ -88,6 +88,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import query7
from apache_beam.testing.benchmarks.nexmark.queries import query8
from apache_beam.testing.benchmarks.nexmark.queries import query9
from apache_beam.testing.benchmarks.nexmark.queries import query11
+from apache_beam.testing.benchmarks.nexmark.queries import query12
from apache_beam.transforms import window
@@ -120,7 +121,7 @@ class NexmarkLauncher(object):
type=int,
action='append',
required=True,
- choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11],
+ choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12],
help='Query to run')
parser.add_argument(
@@ -257,7 +258,8 @@ class NexmarkLauncher(object):
7: query7,
8: query8,
9: query9,
- 11: query11
+ 11: query11,
+ 12: query12
}
# TODO(mariagh): Move to a config file.
diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
new file mode 100644
index 0000000..92a2d0d
--- /dev/null
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Query 12, How many bids does a user make within a fixed processing time limit
+(Not in original suite.)
+
+Group bids by the same user into processing time windows of window_size_sec.
+Emit the count of bids per window.
+"""
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+def load(events, metadata=None):
+ return (
+ events
+ | nexmark_query_util.JustBids()
+ | 'query12_extract_bidder' >> beam.Map(lambda bid: bid.bidder)
+ # windowing with processing time trigger, currently not supported in batch
+ | beam.WindowInto(
+ window.GlobalWindows(),
+ trigger=trigger.Repeatedly(
+ trigger.AfterProcessingTime(metadata.get('window_size_sec'))),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING,
+ allowed_lateness=0)
+ | 'query12_bid_count' >> beam.combiners.Count.PerElement()
+ | 'query12_output' >> beam.Map(
+ lambda t: {
+ ResultNames.BIDDER_ID: t[0], ResultNames.BID_COUNT: t[1]
+ }))