You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:30 UTC

[22/55] [abbrv] beam git commit: Fix compile after ParDo refactor

Fix compile after ParDo refactor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd93c8b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd93c8b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd93c8b5

Branch: refs/heads/master
Commit: bd93c8b55ba6f81c87b74364b26d64e0f8c1103f
Parents: 7bfc982
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Mar 29 10:10:13 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../beam/integration/nexmark/NexmarkQuery.java  | 14 ++++++-------
 .../beam/integration/nexmark/NexmarkRunner.java |  3 +--
 .../beam/integration/nexmark/NexmarkUtils.java  | 16 +++++++-------
 .../integration/nexmark/queries/Query7.java     | 22 ++++++++++----------
 4 files changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index c268a3b..e1cd493 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -97,7 +97,7 @@ public abstract class NexmarkQuery
   };
 
   /** Transform to key each person by their id. */
-  protected static final ParDo.Bound<Person, KV<Long, Person>> PERSON_BY_ID =
+  protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
       ParDo.of(new DoFn<Person, KV<Long, Person>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -106,7 +106,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each auction by its id. */
-  protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
       ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -115,7 +115,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each auction by its seller id. */
-  protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+  protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
       ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -124,7 +124,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to key each bid by it's auction id. */
-  protected static final ParDo.Bound<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+  protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
       ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -133,7 +133,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to project the auction id from each bid. */
-  protected static final ParDo.Bound<Bid, Long> BID_TO_AUCTION =
+  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
       ParDo.of(new DoFn<Bid, Long>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -142,7 +142,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to project the price from each bid. */
-  protected static final ParDo.Bound<Bid, Long> BID_TO_PRICE =
+  protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
       ParDo.of(new DoFn<Bid, Long>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
@@ -151,7 +151,7 @@ public abstract class NexmarkQuery
            });
 
   /** Transform to emit each event with the timestamp embedded within it. */
-  public static final ParDo.Bound<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+  public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
       ParDo.of(new DoFn<Event, Event>() {
              @ProcessElement
              public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index e8d791f..df1000a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -1073,8 +1073,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
       case BIGQUERY:
         // Multiple BigQuery backends to mimic what most customers do.
         PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
-            ParDo.withOutputTags(MAIN, TupleTagList.of(SIDE))
-                 .of(new PartitionDoFn()));
+            ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
         sinkResultsToBigQuery(res.get(MAIN), now, "main");
         sinkResultsToBigQuery(res.get(SIDE), now, "side");
         sinkResultsToBigQuery(formattedResults, now, "copy");

http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index b0421a4..a47ebcc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -417,7 +417,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to pass-through events, but count them as they go by.
    */
-  public static ParDo.Bound<Event, Event> snoop(final String name) {
+  public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
     return ParDo.of(new DoFn<Event, Event>() {
                   final Aggregator<Long, Long> eventCounter =
                       createAggregator("events", Sum.ofLongs());
@@ -451,7 +451,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to count and discard each element.
    */
-  public static <T> ParDo.Bound<T, Void> devNull(String name) {
+  public static <T> ParDo.SingleOutput<T, Void> devNull(String name) {
     return ParDo.of(new DoFn<T, Void>() {
                   final Aggregator<Long, Long> discardCounter =
                       createAggregator("discarded", Sum.ofLongs());
@@ -466,7 +466,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to log each element, passing it through unchanged.
    */
-  public static <T> ParDo.Bound<T, T> log(final String name) {
+  public static <T> ParDo.SingleOutput<T, T> log(final String name) {
     return ParDo.of(new DoFn<T, T>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
@@ -479,7 +479,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to format each element as a string.
    */
-  public static <T> ParDo.Bound<T, String> format(String name) {
+  public static <T> ParDo.SingleOutput<T, String> format(String name) {
     return ParDo.of(new DoFn<T, String>() {
                   final Aggregator<Long, Long> recordCounter =
                       createAggregator("records", Sum.ofLongs());
@@ -495,7 +495,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to make explicit the timestamp of each element.
    */
-  public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) {
+  public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
     return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
@@ -548,7 +548,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to keep the CPU busy for given milliseconds on every record.
    */
-  public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) {
+  public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
     return ParDo.of(new DoFn<T, T>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
@@ -580,7 +580,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to write given number of bytes to durable store on every record.
    */
-  public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) {
+  public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) {
     return ParDo.of(new DoFn<T, T>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
@@ -608,7 +608,7 @@ public class NexmarkUtils {
   /**
    * Return a transform to cast each element to {@link KnownSize}.
    */
-  private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize() {
+  private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
     return ParDo.of(new DoFn<T, KnownSize>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index 2835737..f3d1ba4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -70,18 +70,18 @@ public class Query7 extends NexmarkQuery {
 
     return slidingBids
         // Select all bids which have that maximum price (there may be more than one).
-        .apply(name + ".Select",
-            ParDo.withSideInputs(maxPriceView)
-                .of(new DoFn<Bid, Bid>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    long maxPrice = c.sideInput(maxPriceView);
-                    Bid bid = c.element();
-                    if (bid.price == maxPrice) {
-                      c.output(bid);
-                    }
+        .apply(name + ".Select", ParDo
+          .of(new DoFn<Bid, Bid>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  long maxPrice = c.sideInput(maxPriceView);
+                  Bid bid = c.element();
+                  if (bid.price == maxPrice) {
+                    c.output(bid);
                   }
-                }));
+                }
+              })
+          .withSideInputs(maxPriceView));
   }
 
   @Override