You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/09 03:29:00 UTC

[jira] [Work logged] (BEAM-3182) [Nexmark][SQL] Implement supported queries

     [ https://issues.apache.org/jira/browse/BEAM-3182?focusedWorklogId=78780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-78780 ]

ASF GitHub Bot logged work on BEAM-3182:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/18 03:28
            Start Date: 09/Mar/18 03:28
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #4836: Revert "[BEAM-3182] Ensure nexmark model tests produce output"
URL: https://github.com/apache/beam/pull/4836
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 5d7346be781..2efab3e8f59 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -102,7 +102,6 @@ static Instant windowStart(Duration size, Duration period, Instant timestamp) {
   /** Return assertion to use on results of pipeline for this query. */
   public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
     final Collection<String> expectedStrings = toCollection(simulator().results());
-    Assert.assertFalse(expectedStrings.isEmpty());
 
     return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
       @Override
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
index 0f625a2a91f..b5152d8a0bb 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
@@ -21,8 +21,6 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
 import java.util.TreeMap;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
@@ -44,17 +42,17 @@
    * Simulator for query 6.
    */
   private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> {
-    /** The last 10 winning bids ordered by age, indexed by seller id. */
-    private final Map<Long, Queue<Bid>> winningBidsPerSeller;
+    /** The cumulative count of winning bids, indexed by seller id. */
+    private final Map<Long, Long> numWinningBidsPerSeller;
 
-    /** The cumulative total of last 10 winning bid prices, indexed by seller id. */
+    /** The cumulative total of winning bid prices, indexed by seller id. */
     private final Map<Long, Long> totalWinningBidPricesPerSeller;
 
     private Instant lastTimestamp;
 
     public Simulator(NexmarkConfiguration configuration) {
       super(new WinningBidsSimulator(configuration).results());
-      winningBidsPerSeller = new TreeMap<>();
+      numWinningBidsPerSeller = new TreeMap<>();
       totalWinningBidPricesPerSeller = new TreeMap<>();
       lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
     }
@@ -64,24 +62,19 @@ public Simulator(NexmarkConfiguration configuration) {
      */
     private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
       NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid);
-      Queue<Bid> queue = winningBidsPerSeller.get(auction.seller);
-      if (queue == null) {
-        queue = new PriorityQueue<Bid>(10,
-            (Bid b1, Bid b2) -> Long.compare(b1.dateTime, b2.dateTime));
+      Long count = numWinningBidsPerSeller.get(auction.seller);
+      if (count == null) {
+        count = 1L;
+      } else {
+        count += 1;
       }
+      numWinningBidsPerSeller.put(auction.seller, count);
       Long total = totalWinningBidPricesPerSeller.get(auction.seller);
       if (total == null) {
-        total = 0L;
-      }
-      int count = queue.size();
-      if (count == 10) {
-        total -= queue.remove().price;
+        total = bid.price;
       } else {
-        count += 1;
+        total += bid.price;
       }
-      queue.add(bid);
-      total += bid.price;
-      winningBidsPerSeller.put(auction.seller, queue);
       totalWinningBidPricesPerSeller.put(auction.seller, total);
       TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of(
           new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp);
@@ -93,9 +86,9 @@ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
     protected void run() {
       TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
       if (timestampedWinningBid == null) {
-        for (Map.Entry<Long, Queue<Bid>> entry : winningBidsPerSeller.entrySet()) {
+        for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) {
           long seller = entry.getKey();
-          long count = entry.getValue().size();
+          long count = entry.getValue();
           long total = totalWinningBidPricesPerSeller.get(seller);
           addResult(TimestampedValue.of(
               new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
index 937a727819c..8537e02cd77 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
@@ -48,7 +48,7 @@
   static {
     // careful, results of tests are linked to numEventGenerators because of timestamp generation
     CONFIG.numEventGenerators = 1;
-    CONFIG.numEvents = 5000;
+    CONFIG.numEvents = 1000;
   }
 
   @Rule public TestPipeline p = TestPipeline.create();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 78780)
    Time Spent: 10h 10m  (was: 10h)

> [Nexmark][SQL] Implement supported queries
> ------------------------------------------
>
>                 Key: BEAM-3182
>                 URL: https://issues.apache.org/jira/browse/BEAM-3182
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Implement all queries which can be run with current SQL features and Nexmark infrastructure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)