You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/26 19:14:02 UTC

[beam] branch master updated: implement query 12

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 39e74c4  implement query 12
     new 5d8aa2c  Merge pull request #12675 from [BEAM-2855] implement query 12
39e74c4 is described below

commit 39e74c471e33eb00b591e196b29f1f4fb9d30ab0
Author: Leiyi Zhang <le...@google.com>
AuthorDate: Mon Aug 24 19:45:50 2020 +0000

    implement query 12
---
 .../testing/benchmarks/nexmark/nexmark_launcher.py |  6 ++-
 .../testing/benchmarks/nexmark/queries/query12.py  | 51 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
index 195797e..51526dd 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
@@ -88,6 +88,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import query7
 from apache_beam.testing.benchmarks.nexmark.queries import query8
 from apache_beam.testing.benchmarks.nexmark.queries import query9
 from apache_beam.testing.benchmarks.nexmark.queries import query11
+from apache_beam.testing.benchmarks.nexmark.queries import query12
 from apache_beam.transforms import window
 
 
@@ -120,7 +121,7 @@ class NexmarkLauncher(object):
         type=int,
         action='append',
         required=True,
-        choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11],
+        choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12],
         help='Query to run')
 
     parser.add_argument(
@@ -257,7 +258,8 @@ class NexmarkLauncher(object):
         7: query7,
         8: query8,
         9: query9,
-        11: query11
+        11: query11,
+        12: query12
     }
 
     # TODO(mariagh): Move to a config file.
diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
new file mode 100644
index 0000000..92a2d0d
--- /dev/null
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Query 12, How many bids does a user make within a fixed processing time limit
+(Not in original suite.)
+
+Group bids by the same user into processing time windows of window_size_sec.
+Emit the count of bids per window.
+"""
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+def load(events, metadata=None):
+  return (
+      events
+      | nexmark_query_util.JustBids()
+      | 'query12_extract_bidder' >> beam.Map(lambda bid: bid.bidder)
+      # windowing with processing time trigger, currently not supported in batch
+      | beam.WindowInto(
+          window.GlobalWindows(),
+          trigger=trigger.Repeatedly(
+              trigger.AfterProcessingTime(metadata.get('window_size_sec'))),
+          accumulation_mode=trigger.AccumulationMode.DISCARDING,
+          allowed_lateness=0)
+      | 'query12_bid_count' >> beam.combiners.Count.PerElement()
+      | 'query12_output' >> beam.Map(
+          lambda t: {
+              ResultNames.BIDDER_ID: t[0], ResultNames.BID_COUNT: t[1]
+          }))