You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:30 UTC
[22/55] [abbrv] beam git commit: Fix compile after ParDo refactor
Fix compile after ParDo refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd93c8b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd93c8b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd93c8b5
Branch: refs/heads/master
Commit: bd93c8b55ba6f81c87b74364b26d64e0f8c1103f
Parents: 7bfc982
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Mar 29 10:10:13 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
.../beam/integration/nexmark/NexmarkQuery.java | 14 ++++++-------
.../beam/integration/nexmark/NexmarkRunner.java | 3 +--
.../beam/integration/nexmark/NexmarkUtils.java | 16 +++++++-------
.../integration/nexmark/queries/Query7.java | 22 ++++++++++----------
4 files changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index c268a3b..e1cd493 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -97,7 +97,7 @@ public abstract class NexmarkQuery
};
/** Transform to key each person by their id. */
- protected static final ParDo.Bound<Person, KV<Long, Person>> PERSON_BY_ID =
+ protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
ParDo.of(new DoFn<Person, KV<Long, Person>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -106,7 +106,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each auction by its id. */
- protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+ protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -115,7 +115,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each auction by its seller id. */
- protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+ protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -124,7 +124,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each bid by it's auction id. */
- protected static final ParDo.Bound<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+ protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -133,7 +133,7 @@ public abstract class NexmarkQuery
});
/** Transform to project the auction id from each bid. */
- protected static final ParDo.Bound<Bid, Long> BID_TO_AUCTION =
+ protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -142,7 +142,7 @@ public abstract class NexmarkQuery
});
/** Transform to project the price from each bid. */
- protected static final ParDo.Bound<Bid, Long> BID_TO_PRICE =
+ protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -151,7 +151,7 @@ public abstract class NexmarkQuery
});
/** Transform to emit each event with the timestamp embedded within it. */
- public static final ParDo.Bound<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+ public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
ParDo.of(new DoFn<Event, Event>() {
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index e8d791f..df1000a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -1073,8 +1073,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
case BIGQUERY:
// Multiple BigQuery backends to mimic what most customers do.
PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
- ParDo.withOutputTags(MAIN, TupleTagList.of(SIDE))
- .of(new PartitionDoFn()));
+ ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
sinkResultsToBigQuery(res.get(MAIN), now, "main");
sinkResultsToBigQuery(res.get(SIDE), now, "side");
sinkResultsToBigQuery(formattedResults, now, "copy");
http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index b0421a4..a47ebcc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -417,7 +417,7 @@ public class NexmarkUtils {
/**
* Return a transform to pass-through events, but count them as they go by.
*/
- public static ParDo.Bound<Event, Event> snoop(final String name) {
+ public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
return ParDo.of(new DoFn<Event, Event>() {
final Aggregator<Long, Long> eventCounter =
createAggregator("events", Sum.ofLongs());
@@ -451,7 +451,7 @@ public class NexmarkUtils {
/**
* Return a transform to count and discard each element.
*/
- public static <T> ParDo.Bound<T, Void> devNull(String name) {
+ public static <T> ParDo.SingleOutput<T, Void> devNull(String name) {
return ParDo.of(new DoFn<T, Void>() {
final Aggregator<Long, Long> discardCounter =
createAggregator("discarded", Sum.ofLongs());
@@ -466,7 +466,7 @@ public class NexmarkUtils {
/**
* Return a transform to log each element, passing it through unchanged.
*/
- public static <T> ParDo.Bound<T, T> log(final String name) {
+ public static <T> ParDo.SingleOutput<T, T> log(final String name) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -479,7 +479,7 @@ public class NexmarkUtils {
/**
* Return a transform to format each element as a string.
*/
- public static <T> ParDo.Bound<T, String> format(String name) {
+ public static <T> ParDo.SingleOutput<T, String> format(String name) {
return ParDo.of(new DoFn<T, String>() {
final Aggregator<Long, Long> recordCounter =
createAggregator("records", Sum.ofLongs());
@@ -495,7 +495,7 @@ public class NexmarkUtils {
/**
* Return a transform to make explicit the timestamp of each element.
*/
- public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) {
+ public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -548,7 +548,7 @@ public class NexmarkUtils {
/**
* Return a transform to keep the CPU busy for given milliseconds on every record.
*/
- public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) {
+ public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -580,7 +580,7 @@ public class NexmarkUtils {
/**
* Return a transform to write given number of bytes to durable store on every record.
*/
- public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) {
+ public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -608,7 +608,7 @@ public class NexmarkUtils {
/**
* Return a transform to cast each element to {@link KnownSize}.
*/
- private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize() {
+ private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
return ParDo.of(new DoFn<T, KnownSize>() {
@ProcessElement
public void processElement(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index 2835737..f3d1ba4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -70,18 +70,18 @@ public class Query7 extends NexmarkQuery {
return slidingBids
// Select all bids which have that maximum price (there may be more than one).
- .apply(name + ".Select",
- ParDo.withSideInputs(maxPriceView)
- .of(new DoFn<Bid, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long maxPrice = c.sideInput(maxPriceView);
- Bid bid = c.element();
- if (bid.price == maxPrice) {
- c.output(bid);
- }
+ .apply(name + ".Select", ParDo
+ .of(new DoFn<Bid, Bid>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ long maxPrice = c.sideInput(maxPriceView);
+ Bid bid = c.element();
+ if (bid.price == maxPrice) {
+ c.output(bid);
}
- }));
+ }
+ })
+ .withSideInputs(maxPriceView));
}
@Override