You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:32 UTC
[24/55] [abbrv] beam git commit: Fix and improve query3 and query12
Fix and improve query3 and query12
query3: Use GlobalWindow to comply with the State/Timer APIs (issue #7). Use timer for personState expiration in GlobalWindow (issue #29). Add trigger to GlobalWindow
query12: Replace Count.perKey by Count.perElement (issue #34)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c28b492
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c28b492
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c28b492
Branch: refs/heads/master
Commit: 7c28b492aa17160d9a84914814e618716b7beb9f
Parents: bd93c8b
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Mon Apr 3 15:18:04 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
.../nexmark/NexmarkConfiguration.java | 19 +-
.../integration/nexmark/NexmarkOptions.java | 7 +
.../integration/nexmark/queries/Query12.java | 19 +-
.../integration/nexmark/queries/Query3.java | 263 +++++++++++--------
.../integration/nexmark/queries/QueryTest.java | 4 +
5 files changed, 195 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index e2890ed..d6cd808 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -195,6 +195,13 @@ public class NexmarkConfiguration implements Serializable {
public int fanout = 5;
/**
+ * Maximum waiting time to clean personState in query3
+ * (ie maximum waiting of the auctions related to person in state in seconds in event time).
+ */
+ @JsonProperty
+ public int maxAuctionsWaitingTime = 600;
+
+ /**
* Length of occasional delay to impose on events (in seconds).
*/
@JsonProperty
@@ -322,6 +329,9 @@ public class NexmarkConfiguration implements Serializable {
if (options.getFanout() != null) {
fanout = options.getFanout();
}
+ if (options.getMaxAuctionsWaitingTime() != null) {
+ fanout = options.getMaxAuctionsWaitingTime();
+ }
if (options.getOccasionalDelaySec() != null) {
occasionalDelaySec = options.getOccasionalDelaySec();
}
@@ -376,6 +386,7 @@ public class NexmarkConfiguration implements Serializable {
result.diskBusyBytes = diskBusyBytes;
result.auctionSkip = auctionSkip;
result.fanout = fanout;
+ result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
result.occasionalDelaySec = occasionalDelaySec;
result.probDelayedEvent = probDelayedEvent;
result.maxLogEvents = maxLogEvents;
@@ -479,6 +490,9 @@ public class NexmarkConfiguration implements Serializable {
if (fanout != DEFAULT.fanout) {
sb.append(String.format("; fanout:%d", fanout));
}
+ if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
+ sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
+ }
if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
}
@@ -527,7 +541,7 @@ public class NexmarkConfiguration implements Serializable {
ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
- coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout,
+ coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime,
occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
outOfOrderGroupSize);
}
@@ -571,6 +585,9 @@ public class NexmarkConfiguration implements Serializable {
if (fanout != other.fanout) {
return false;
}
+ if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
+ return false;
+ }
if (firstEventRate != other.firstEventRate) {
return false;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 1be974f..e39f0a4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -309,6 +309,13 @@ public interface NexmarkOptions extends PubsubOptions {
void setFanout(Integer fanout);
+ @Description("Maximum waiting time to clean personState in query3 "
+ + "(ie maximum waiting of the auctions related to person in state in seconds in event time).")
+ @Nullable
+ Integer getMaxAuctionsWaitingTime();
+
+ void setMaxAuctionsWaitingTime(Integer fanout);
+
@Description("Length of occasional delay to impose on events (in seconds).")
@Nullable
Long getOccasionalDelaySec();
http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
index c67401b..a5db504 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -49,16 +49,13 @@ public class Query12 extends NexmarkQuery {
private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
return events
.apply(JUST_BIDS)
- .apply(name + ".Rekey",
- // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
- ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(KV.of(bid.bidder, (Void) null));
- }
- }))
- .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+ .apply(ParDo.of(new DoFn<Bid, Long>() {
+ @ProcessElement
+ public void processElement(ProcessContext c){
+ c.output(c.element().bidder);
+ }
+ }))
+ .apply(Window.<Long>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
@@ -66,7 +63,7 @@ public class Query12 extends NexmarkQuery {
Duration.standardSeconds(configuration.windowSizeSec))))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
- .apply(Count.<Long, Void>perKey())
+ .apply(Count.<Long>perElement())
.apply(name + ".ToResult",
ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 128c2b7..ba31e9f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -39,14 +39,21 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,31 +69,141 @@ import org.slf4j.LoggerFactory;
* </pre>
*
* <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
- * events for the auction seller. Those auctions will be stored until the matching person is
- * seen. Then all subsequent auctions for a person will use the stored person record.
+ * events for the auction seller. Those auctions will be stored until the matching person is seen.
+ * Then all subsequent auctions for a person will use the stored person record.
*
* <p>A real system would use an external system to maintain the id-to-person association.
*/
public class Query3 extends NexmarkQuery {
+
private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
-// private static final StateContext GLOBAL_NAMESPACE = StateContexts.global();
- private static final StateSpec<Object, ValueState<List<Auction>>> AUCTION_LIST_CODED_TAG =
- StateSpecs.value(ListCoder.of(Auction.CODER));
- private static final StateSpec<Object, ValueState<Person>> PERSON_CODED_TAG =
- StateSpecs.value(Person.CODER);
+ private final JoinDoFn joinDoFn;
+
+ public Query3(NexmarkConfiguration configuration) {
+ super(configuration, "Query3");
+ joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime);
+
+ }
+
+ @Override
+ @Nullable
+ public Aggregator<Long, Long> getFatalCount() {
+ return joinDoFn.fatalCounter;
+ }
+
+ private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
+ int numEventsInPane = 30;
+
+ PCollection<Event> eventsWindowed =
+ events.apply(
+ Window.<Event>into(new GlobalWindows())
+ .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane))))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.ZERO));
+ PCollection<KV<Long, Auction>> auctionsBySellerId =
+ eventsWindowed
+ // Only want the new auction events.
+ .apply(JUST_NEW_AUCTIONS)
+
+ // We only want auctions in category 10.
+ .apply(
+ name + ".InCategory",
+ Filter.by(
+ new SerializableFunction<Auction, Boolean>() {
+
+ @Override
+ public Boolean apply(Auction auction) {
+ return auction.category == 10;
+ }
+ }))
+
+ // Key auctions by their seller id.
+ .apply("AuctionBySeller", AUCTION_BY_SELLER);
+
+ PCollection<KV<Long, Person>> personsById =
+ eventsWindowed
+ // Only want the new people events.
+ .apply(JUST_NEW_PERSONS)
+
+ // We only want people in OR, ID, CA.
+ .apply(
+ name + ".InState",
+ Filter.by(
+ new SerializableFunction<Person, Boolean>() {
+
+ @Override
+ public Boolean apply(Person person) {
+ return person.state.equals("OR")
+ || person.state.equals("ID")
+ || person.state.equals("CA");
+ }
+ }))
+
+ // Key people by their id.
+ .apply("PersonById", PERSON_BY_ID);
+
+ return
+ // Join auctions and people.
+ // concatenate KeyedPCollections
+ KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
+ .and(PERSON_TAG, personsById)
+ // group auctions and persons by personId
+ .apply(CoGroupByKey.<Long>create())
+ .apply(name + ".Join", ParDo.of(joinDoFn))
+
+ // Project what we want.
+ .apply(
+ name + ".Project",
+ ParDo.of(
+ new DoFn<KV<Auction, Person>, NameCityStateId>() {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction = c.element().getKey();
+ Person person = c.element().getValue();
+ c.output(
+ new NameCityStateId(person.name, person.city, person.state, auction.id));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
/**
- * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair
- * at a time.
+ * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at
+ * a time.
*
* <p>We know a person may submit any number of auctions. Thus new person event must have the
* person record stored in persistent state in order to match future auctions by that person.
*
- * <p>However we know that each auction is associated with at most one person, so only need
- * to store auction records in persistent state until we have seen the corresponding person
- * record. And of course may have already seen that record.
+ * <p>However we know that each auction is associated with at most one person, so only need to
+ * store auction records in persistent state until we have seen the corresponding person record.
+ * And of course may have already seen that record.
*/
private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
+
+ private int maxAuctionsWaitingTime;
+ private static final String AUCTIONS = "auctions";
+ private static final String PERSON = "person";
+
+ @StateId(PERSON)
+ private static final StateSpec<Object, ValueState<Person>> personSpec =
+ StateSpecs.value(Person.CODER);
+
+ private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
+
+ public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
+
+ @StateId(AUCTIONS)
+ private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
+ StateSpecs.value(ListCoder.of(Auction.CODER));
+
+ @TimerId(PERSON_STATE_EXPIRING)
+ private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
private final Aggregator<Long, Long> newAuctionCounter =
createAggregator("newAuction", Sum.ofLongs());
private final Aggregator<Long, Long> newPersonCounter =
@@ -97,20 +214,25 @@ public class Query3 extends NexmarkQuery {
createAggregator("newOldOutput", Sum.ofLongs());
private final Aggregator<Long, Long> oldNewOutputCounter =
createAggregator("oldNewOutput", Sum.ofLongs());
- public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
+
+ private JoinDoFn(int maxAuctionsWaitingTime) {
+ this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+ }
@ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- //TODO: This is using the internal state API. Rework to use the
- //TODO Ismael this is broken for not access to state
+ public void processElement(
+ ProcessContext c,
+ @TimerId(PERSON_STATE_EXPIRING) Timer timer,
+ @StateId(PERSON) ValueState<Person> personState,
+ @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState)
+ throws IOException {
// We would *almost* implement this by rewindowing into the global window and
// running a combiner over the result. The combiner's accumulator would be the
// state we use below. However, combiners cannot emit intermediate results, thus
// we need to wait for the pending ReduceFn API.
-// StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
-// ValueState<Person> personState = stateInternals.state(GLOBAL_NAMESPACE, PERSON_CODED_TAG);
-// Person existingPerson = personState.read();
- Person existingPerson = null;
+
+ Person existingPerson = personState.read();
+
if (existingPerson != null) {
// We've already seen the new person event for this person id.
// We can join with any new auctions on-the-fly without needing any
@@ -123,8 +245,6 @@ public class Query3 extends NexmarkQuery {
return;
}
-// ValueState<List<Auction>> auctionsState =
-// stateInternals.state(GLOBAL_NAMESPACE, AUCTION_LIST_CODED_TAG);
Person theNewPerson = null;
for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
if (theNewPerson == null) {
@@ -140,14 +260,14 @@ public class Query3 extends NexmarkQuery {
}
newPersonCounter.addValue(1L);
// We've now seen the person for this person id so can flush any
- // pending auctions for the same seller id.
- List<Auction> pendingAuctions = null; //auctionsState.read();
+ // pending auctions for the same seller id (an auction is done by only one seller).
+ List<Auction> pendingAuctions = auctionsState.read();
if (pendingAuctions != null) {
for (Auction pendingAuction : pendingAuctions) {
oldNewOutputCounter.addValue(1L);
c.output(KV.of(pendingAuction, newPerson));
}
-// auctionsState.clear();
+ auctionsState.clear();
}
// Also deal with any new auctions.
for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
@@ -156,8 +276,11 @@ public class Query3 extends NexmarkQuery {
c.output(KV.of(newAuction, newPerson));
}
// Remember this person for any future auctions.
-
-// personState.write(newPerson);
+ personState.write(newPerson);
+ //set a time out to clear this state
+ Instant firingTime = new Instant(newPerson.dateTime)
+ .plus(Duration.standardSeconds(maxAuctionsWaitingTime));
+ timer.set(firingTime);
}
if (theNewPerson != null) {
return;
@@ -165,7 +288,7 @@ public class Query3 extends NexmarkQuery {
// We'll need to remember the auctions until we see the corresponding
// new person event.
- List<Auction> pendingAuctions = null; //auctionsState.read();
+ List<Auction> pendingAuctions = auctionsState.read();
if (pendingAuctions == null) {
pendingAuctions = new ArrayList<>();
}
@@ -173,84 +296,14 @@ public class Query3 extends NexmarkQuery {
newAuctionCounter.addValue(1L);
pendingAuctions.add(newAuction);
}
-// auctionsState.write(pendingAuctions);
+ auctionsState.write(pendingAuctions);
}
+ @OnTimer(PERSON_STATE_EXPIRING)
+ public void onTimerCallback(
+ OnTimerContext context,
+ @StateId(PERSON) ValueState<Person> personState) {
+ personState.clear();
}
- private final JoinDoFn joinDoFn = new JoinDoFn();
-
- public Query3(NexmarkConfiguration configuration) {
- super(configuration, "Query3");
- }
-
- @Override
- @Nullable
- public Aggregator<Long, Long> getFatalCount() {
- return joinDoFn.fatalCounter;
- }
-
- private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
- // Batch into incremental results windows.
- events = events.apply(
- Window.<Event>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
-
- PCollection<KV<Long, Auction>> auctionsBySellerId =
- events
- // Only want the new auction events.
- .apply(JUST_NEW_AUCTIONS)
-
- // We only want auctions in category 10.
- .apply(name + ".InCategory", Filter.by(new SerializableFunction<Auction, Boolean>() {
- @Override
- public Boolean apply(Auction auction) {
- return auction.category == 10;
- }
- }))
-
- // Key auctions by their seller id.
- .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
- PCollection<KV<Long, Person>> personsById =
- events
- // Only want the new people events.
- .apply(JUST_NEW_PERSONS)
-
- // We only want people in OR, ID, CA.
- .apply(name + ".InState", Filter.by(new SerializableFunction<Person, Boolean>() {
- @Override
- public Boolean apply(Person person) {
- return person.state.equals("OR") || person.state.equals("ID")
- || person.state.equals("CA");
- }
- }))
-
- // Key people by their id.
- .apply("PersonById", PERSON_BY_ID);
-
- return
- // Join auctions and people.
- // concatenate KeyedPCollections
- KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
- .and(PERSON_TAG, personsById)
- // group auctions and persons by personId
- .apply(CoGroupByKey.<Long>create())
- .apply(name + ".Join", ParDo.of(joinDoFn))
-
- // Project what we want.
- .apply(name + ".Project",
- ParDo.of(new DoFn<KV<Auction, Person>, NameCityStateId>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction = c.element().getKey();
- Person person = c.element().getValue();
- c.output(new NameCityStateId(
- person.name, person.city, person.state, auction.id));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7c28b492/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index 5cf4287..dca2887 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -25,10 +25,13 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -95,6 +98,7 @@ public class QueryTest {
}
@Test
+ @Category({UsesStatefulParDo.class, UsesTimersInParDo.class})
public void query7MatchesModel() {
queryMatchesModel("Query7Test", new Query7(CONFIG), new Query7Model(CONFIG));
}