You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 17:08:49 UTC

[GitHub] [beam] leiyiz opened a new pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

leiyiz opened a new pull request #12427:
URL: https://github.com/apache/beam/pull/12427


   changed queries 0-2, implemented query9, coders for all models, new models for query results, changed launcher to count and print results to file
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r466750803



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -103,6 +121,107 @@ def process(self, elem):
     yield event
 
 
+class ParseJsonEvnetFn(beam.DoFn):
+  """Parses the raw event info into a Python objects.
+
+  Each event line has the following format:
+
+    person: {id,name,email,credit_card,city, \
+                          state,timestamp,extra}
+    auction: {id,item_name, description,initial_bid, \
+                          reserve_price,timestamp,expires,seller,category,extra}
+    bid: {auction,bidder,price,timestamp,extra}
+
+  For example:
+
+    {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
+        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
+        "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
+
+    {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
+        "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
+        "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
+
+    {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
+        "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}

Review comment:
       It's a bit of a silly thing, but something like this:
   
   ```suggestion
       person: {id,name,email,credit_card,city, \
                state,timestamp,extra}
       auction: {id,item_name, description,initial_bid, \
                 reserve_price,timestamp,expires,seller,category,extra}
       bid: {auction,bidder,price,timestamp,extra}
   
     For example:
   
       {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
        "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
   
       {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
        "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
        "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
   
       {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
        "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12427:
URL: https://github.com/apache/beam/pull/12427


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r464037245



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py
##########
@@ -26,31 +26,55 @@
   - The bid on an item for auction (Bid).
 
 """
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.testing.benchmarks.nexmark import nexmark_util
+
+
+class PersonCoder(FastCoder):
+  def _create_impl(self):
+    return PersonCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
 
 
 class Person(object):
   "Author of an auction or a bid."
+  CODER = PersonCoder()
 
   def __init__(
-      self, id, name, email, credit_card, city, state, timestamp, extra=None):
+      self, id, name, email, credit_card, city, state, date_time, extra=None):
     self.id = id
     self.name = name
-    self.email = email  # key
+    self.email_address = email  # key
     self.credit_card = credit_card
     self.city = city
     self.state = state
-    self.timestamp = timestamp
+    self.date_time = date_time
     self.extra = extra
 
   def __repr__(self):
-    return 'Person({id}, {email})'.format(
-        **{
-            'id': self.id, 'email': self.email
-        })
+    return nexmark_util.model_to_json(self)
+
+
+class AuctionCoder(FastCoder):
+  def to_type_hint(self):

Review comment:
       my bad it is a mistake..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem closed pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
pabloem closed pull request #12427:
URL: https://github.com/apache/beam/pull/12427


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #12427: nexmark python queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-666716230


   Could you rename the title to be more meaningful and add `[BEAM-2855]` in the front so that it will be hooked with the right JIRA ticket https://issues.apache.org/jira/browse/BEAM-2855.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-667830003


   R: @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r466705734



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -103,6 +121,107 @@ def process(self, elem):
     yield event
 
 
+class ParseJsonEvnetFn(beam.DoFn):
+  """Parses the raw event info into a Python objects.
+
+  Each event line has the following format:
+
+    person: {id,name,email,credit_card,city, \
+                          state,timestamp,extra}
+    auction: {id,item_name, description,initial_bid, \
+                          reserve_price,timestamp,expires,seller,category,extra}
+    bid: {auction,bidder,price,timestamp,extra}
+
+  For example:
+
+    {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
+        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\

Review comment:
       I'm not exactly sure what format should this be in? should I just delete these examples or something




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r466704612



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
##########
@@ -29,20 +29,16 @@
 
 import apache_beam as beam
 from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
 def load(raw_events, query_args=None):
   return (
       raw_events
-      | 'ParseEventFn' >> beam.ParDo(ParseEventFn())
-      | 'FilterInBids' >>
-      beam.Filter(lambda event: isinstance(event, nexmark_model.Bid))
+      | nexmark_query_util.JustBids()
       | 'ConvertToEuro' >> beam.Map(
           lambda bid: nexmark_model.Bid(
               bid.auction,
-              bid.bidder, (float(bid.price) * 89) // 100,
-              bid.timestamp,
-              bid.extra))
-      | 'DisplayQuery1' >> beam.Map(display))  # pylint: disable=expression-not-assigned
+              bid.bidder, (bid.price * 89) // 100,

Review comment:
       no problem




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r466704695



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
##########
@@ -29,17 +29,15 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.models import auction_price
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
-def load(raw_events, metadata=None):
+def load(events, metadata=None):
   return (
-      raw_events
-      | 'ParseEventFn' >> beam.ParDo(ParseEventFn())
-      | 'FilterInAuctionsWithSelectedId' >> beam.Filter(
-          lambda event: (
-              isinstance(event, nexmark_model.Auction) and event.id == metadata.
-              get('auction_id')))
-      | 'DisplayQuery2' >> beam.Map(display))  # pylint: disable=expression-not-assigned
+      events
+      | nexmark_query_util.JustBids()

Review comment:
       yes the doc should be updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-673663575


   : )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r464038357



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):

Review comment:
       I think it is fine to directly access the is_auciton_window field




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r465923575



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
##########
@@ -29,17 +29,15 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.models import auction_price
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
-def load(raw_events, metadata=None):
+def load(events, metadata=None):

Review comment:
       Perhaps it's not that important, but it looks like these events are already parsed (hence the name `events` instead of `raw_events`. Should you rename the pcollection in query 1 and query 0?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -103,6 +121,107 @@ def process(self, elem):
     yield event
 
 
+class ParseJsonEvnetFn(beam.DoFn):
+  """Parses the raw event info into a Python objects.
+
+  Each event line has the following format:
+
+    person: {id,name,email,credit_card,city, \
+                          state,timestamp,extra}
+    auction: {id,item_name, description,initial_bid, \
+                          reserve_price,timestamp,expires,seller,category,extra}
+    bid: {auction,bidder,price,timestamp,extra}
+
+  For example:
+
+    {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
+        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\

Review comment:
       can you improve the formatting of these docs?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
##########
@@ -29,17 +29,15 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.models import auction_price
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
-def load(raw_events, metadata=None):
+def load(events, metadata=None):
   return (
-      raw_events
-      | 'ParseEventFn' >> beam.ParDo(ParseEventFn())
-      | 'FilterInAuctionsWithSelectedId' >> beam.Filter(
-          lambda event: (
-              isinstance(event, nexmark_model.Auction) and event.id == metadata.
-              get('auction_id')))
-      | 'DisplayQuery2' >> beam.Map(display))  # pylint: disable=expression-not-assigned
+      events
+      | nexmark_query_util.JustBids()

Review comment:
       The documentation for q2 says `Select auctions by auction id.` - but I see you're selecting Bids, and then filtering them by Auction. Should the documentation be updated or should you be selecting Auctions and filtering those?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
##########
@@ -29,20 +29,16 @@
 
 import apache_beam as beam
 from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
 def load(raw_events, query_args=None):
   return (
       raw_events
-      | 'ParseEventFn' >> beam.ParDo(ParseEventFn())
-      | 'FilterInBids' >>
-      beam.Filter(lambda event: isinstance(event, nexmark_model.Bid))
+      | nexmark_query_util.JustBids()
       | 'ConvertToEuro' >> beam.Map(
           lambda bid: nexmark_model.Bid(
               bid.auction,
-              bid.bidder, (float(bid.price) * 89) // 100,
-              bid.timestamp,
-              bid.extra))
-      | 'DisplayQuery1' >> beam.Map(display))  # pylint: disable=expression-not-assigned
+              bid.bidder, (bid.price * 89) // 100,

Review comment:
       Perhaps create constants for 89 and 100? `EURO_RATE = 0.89` or something like that?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
##########
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+

Review comment:
       Please document query9 at the top




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on pull request #12427: nexmark python queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-666640310


   R: @pabloem 
   R: @y1chi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] leiyiz commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
leiyiz commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r466593339



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
##########
@@ -29,17 +29,15 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import display
+from apache_beam.testing.benchmarks.nexmark.models import auction_price
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
 
 
-def load(raw_events, metadata=None):
+def load(events, metadata=None):

Review comment:
       yeah that is probably better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12427: [BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r463285687



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
##########
@@ -30,8 +30,17 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
 
 
-def load(raw_events, query_args=None):
-  return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn())  # pylint: disable=expression-not-assigned
+class round_tripFn(beam.DoFn):

Review comment:
       cls doesn't use snake case.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):

Review comment:
       is this function necessary?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' %
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}

Review comment:
       nit: rename to auction_id_to_auction_window, auction_id_to_bid_window.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py
##########
@@ -26,31 +26,55 @@
   - The bid on an item for auction (Bid).
 
 """
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.testing.benchmarks.nexmark import nexmark_util
+
+
+class PersonCoder(FastCoder):
+  def _create_impl(self):
+    return PersonCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
 
 
 class Person(object):
   "Author of an auction or a bid."
+  CODER = PersonCoder()
 
   def __init__(
-      self, id, name, email, credit_card, city, state, timestamp, extra=None):
+      self, id, name, email, credit_card, city, state, date_time, extra=None):
     self.id = id
     self.name = name
-    self.email = email  # key
+    self.email_address = email  # key
     self.credit_card = credit_card
     self.city = city
     self.state = state
-    self.timestamp = timestamp
+    self.date_time = date_time
     self.extra = extra
 
   def __repr__(self):
-    return 'Person({id}, {email})'.format(
-        **{
-            'id': self.id, 'email': self.email
-        })
+    return nexmark_util.model_to_json(self)
+
+
+class AuctionCoder(FastCoder):
+  def to_type_hint(self):

Review comment:
       why we need to override and pass here?

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' %
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}
+    id_to_bid = {}
+    for window in merge_context.windows:
+      if window.is_auction_window_fn():
+        id_to_auction[window.auction] = window
+      else:
+        if window.auction in id_to_bid:
+          bid_windows = id_to_bid[window.auction]
+        else:
+          bid_windows = []
+          id_to_bid[window.auction] = bid_windows

Review comment:
       nit: you can do
   ```
   if window.auction not in id_to_bid:
     id_to_bid[window.auction] = []
   id_to_bid[window.auction].append(window)
   ```

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' %
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}
+    id_to_bid = {}
+    for window in merge_context.windows:
+      if window.is_auction_window_fn():
+        id_to_auction[window.auction] = window
+      else:
+        if window.auction in id_to_bid:
+          bid_windows = id_to_bid[window.auction]
+        else:
+          bid_windows = []
+          id_to_bid[window.auction] = bid_windows
+        bid_windows.append(window)
+
+    for auction, auction_window in id_to_auction.items():
+      bid_window_list = id_to_bid.get(auction)
+      if bid_window_list is not None:
+        to_merge = []
+        for bid_window in bid_window_list:
+          if bid_window.start < auction_window.end:
+            to_merge.append(bid_window)
+        if len(to_merge) > 0:
+          to_merge.append(auction_window)
+          merge_context.merge(to_merge, auction_window)
+
+  def get_window_coder(self):
+    return AuctionOrBidWindowCoder()
+
+  def get_transformed_output_time(self, window, input_timestamp):
+    return window.max_timestamp()
+
+
+class JoinAuctionBidFn(beam.DoFn):
+  @staticmethod
+  def higher_bid(bid, other):
+    if bid.price > other.price:
+      return True
+    elif bid.price < other.price:
+      return False
+    else:
+      return bid.date_time < other.date_time
+
+  def process(self, element):
+    _, group = element
+    auctions = group[nexmark_query_util.AUCTION_TAG]
+    auction = auctions[0] if auctions else None
+    if auction is None:
+      return
+    best_bid = None
+    for bid in group[nexmark_query_util.BID_TAG]:
+      if bid.price < auction.reserve:
+        continue
+      if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid):
+        best_bid = bid
+    if best_bid is None:
+      return

Review comment:
       ```
   if best_bid:
     yield ...
   ```

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -103,6 +121,108 @@ def process(self, elem):
     yield event
 
 
+class ParseJsonEvnetFn(beam.DoFn):
+  """Parses the raw event info into a Python objects.
+
+  Each event line has the following format:
+
+    person: {id,name,email,credit_card,city, \
+                          state,timestamp,extra}
+    auction: {id,item_name, description,initial_bid, \
+                          reserve_price,timestamp,expires,seller,category,extra}
+    bid: {auction,bidder,price,timestamp,extra}
+
+  For example:
+
+    {"id":1000,"name":"Peter Jones","emailAddress":"nhd@xcat.com",\
+        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
+        "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
+
+    {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
+        "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
+        "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
+
+    {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
+        "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}
+  """
+  def process(self, elem):
+    json_dict = json.loads(elem)
+    if type(json_dict[FieldNames.DATE_TIME]) is dict:
+      json_dict[FieldNames.DATE_TIME] = json_dict[
+          FieldNames.DATE_TIME]['millis']
+    if FieldNames.NAME in json_dict:
+      yield nexmark_model.Person(
+          json_dict[FieldNames.ID],
+          json_dict[FieldNames.NAME],
+          json_dict[FieldNames.EMAIL_ADDRESS],
+          json_dict[FieldNames.CREDIT_CARD],
+          json_dict[FieldNames.CITY],
+          json_dict[FieldNames.STATE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          json_dict[FieldNames.EXTRA])
+    elif FieldNames.ITEM_NAME in json_dict:
+      if type(json_dict[FieldNames.EXPIRES]) is dict:
+        json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis']
+      yield nexmark_model.Auction(
+          json_dict[FieldNames.ID],
+          json_dict[FieldNames.ITEM_NAME],
+          json_dict[FieldNames.DESCRIPTION],
+          json_dict[FieldNames.INITIAL_BID],
+          json_dict[FieldNames.RESERVE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          millis_to_timestamp(json_dict[FieldNames.EXPIRES]),
+          json_dict[FieldNames.SELLER],
+          json_dict[FieldNames.CATEGORY],
+          json_dict[FieldNames.EXTRA])
+    elif FieldNames.AUCTION in json_dict:
+      yield nexmark_model.Bid(
+          json_dict[FieldNames.AUCTION],
+          json_dict[FieldNames.BIDDER],
+          json_dict[FieldNames.PRICE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          json_dict[FieldNames.EXTRA])
+    else:
+      raise ValueError('Invalid event: %s.' % str(json_dict))
+
+
+class CountAndLog(beam.PTransform):
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'window' >> beam.WindowInto(window.GlobalWindows())
+        | "Count" >> beam.combiners.Count.Globally()
+        | "Log" >> beam.Map(log_count_info))
+
+
+def log_count_info(count):
+  logging.info('Query resulted in %d results', count)
+  return count
+
+
 def display(elm):
   logging.debug(elm)
   return elm
+
+
+def model_to_json(model):
+  return json.dumps(construct_json_dict(model), separators=(',', ':'))
+
+
+def construct_json_dict(model):
+  return {k: unnest_to_json(v) for k, v in model.__dict__.items()}
+
+
+def unnest_to_json(cand):
+  if isinstance(cand, Timestamp):
+    return cand.micros // 1000
+  elif isinstance(
+      cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)):
+    return construct_json_dict(cand)
+  else:
+    return cand
+
+
+def millis_to_timestamp(millis: int):
+  second = millis // 1000
+  micro_second = millis % 1000 * 1000
+  return Timestamp(second, micro_second)

Review comment:
       I think you can simply pass the micro converted from millis to Timestamp

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -263,4 +279,4 @@ def run(self):
 if __name__ == '__main__':
   launcher = NexmarkLauncher()
   launcher.run()
-  launcher.cleanup()
+  # launcher.cleanup()

Review comment:
       I think this shouldn't be commented out.

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)

Review comment:
       I think you need to be careful here, if timestamp is Timestamp type and expected_duration_micro is int, `+` will try to convert expected_duration_micro * 2 with Duration.of(expected_duration_micro * 2), which by default treat the expected_duration_micro * 2 as seconds value. You probably want to do something like timestamp + Duration(micros=expected_duration_micro * 2).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org