You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:01 UTC
[53/55] [abbrv] beam git commit: Clean, fix findbugs, fix checkstyle
Clean, fix findbugs, fix checkstyle
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f9b4948
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f9b4948
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f9b4948
Branch: refs/heads/master
Commit: 2f9b4948fd60a749ada832d003acf0bd84875fcb
Parents: 6c11670
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200
----------------------------------------------------------------------
.../nexmark/NexmarkConfiguration.java | 9 +-
.../integration/nexmark/NexmarkLauncher.java | 62 +++++-------
.../integration/nexmark/NexmarkOptions.java | 3 +-
.../beam/integration/nexmark/NexmarkSuite.java | 4 +-
.../beam/integration/nexmark/model/Event.java | 99 ++++++++++----------
.../nexmark/queries/Query0Model.java | 1 -
.../nexmark/queries/Query1Model.java | 1 -
.../integration/nexmark/queries/Query3.java | 8 +-
.../integration/nexmark/queries/Query5.java | 68 ++++++++------
.../integration/nexmark/queries/Query7.java | 2 +-
.../nexmark/queries/Query7Model.java | 1 -
.../nexmark/queries/WinningBids.java | 37 +++++++-
.../nexmark/queries/WinningBidsSimulator.java | 1 -
.../integration/nexmark/sources/Generator.java | 36 +++++--
.../nexmark/sources/GeneratorConfig.java | 29 +++---
.../integration/nexmark/queries/QueryTest.java | 6 +-
.../sources/UnboundedEventSourceTest.java | 6 +-
17 files changed, 211 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index 5a8cb71..2faf3f5 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
@@ -359,11 +358,11 @@ public class NexmarkConfiguration implements Serializable {
}
/**
- * Return clone of configuration with given label.
+ * Return copy of configuration with given label.
*/
- @Override
- public NexmarkConfiguration clone() {
- NexmarkConfiguration result = new NexmarkConfiguration();
+ public NexmarkConfiguration copy() {
+ NexmarkConfiguration result;
+ result = new NexmarkConfiguration();
result.debug = debug;
result.query = query;
result.sourceType = sourceType;
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
index db53191..a609975 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -87,11 +87,13 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
+import org.slf4j.LoggerFactory;
/**
* Run a single Nexmark query using a given configuration.
*/
public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
/**
* Minimum number of samples needed for 'stead-state' rate calculation.
*/
@@ -166,13 +168,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
}
/**
- * Return number of cores per worker.
- */
- protected int coresPerWorker() {
- return 4;
- }
-
- /**
* Return maximum number of workers.
*/
private int maxNumWorkers() {
@@ -185,7 +180,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
*/
private long getCounterMetric(PipelineResult result, String namespace, String name,
long defaultValue) {
- //TODO Ismael calc this only once
MetricQueryResults metrics = result.metrics().queryMetrics(
MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
Iterable<MetricResult<Long>> counters = metrics.counters();
@@ -193,7 +187,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
MetricResult<Long> metricResult = counters.iterator().next();
return metricResult.attempted();
} catch (NoSuchElementException e) {
- //TODO Ismael
+ LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
}
return defaultValue;
}
@@ -209,15 +203,20 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
try {
MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
- if (distType.equals(DistributionType.MIN)) {
- return distributionResult.attempted().min();
- } else if (distType.equals(DistributionType.MAX)) {
- return distributionResult.attempted().max();
- } else {
- //TODO Ismael
+ switch (distType)
+ {
+ case MIN:
+ return distributionResult.attempted().min();
+ case MAX:
+ return distributionResult.attempted().max();
+ default:
+ return defaultValue;
}
} catch (NoSuchElementException e) {
- //TODO Ismael
+ LOG.error(
+ "Failed to get distribution metric {} for namespace {}",
+ name,
+ namespace);
}
return defaultValue;
}
@@ -228,7 +227,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
* Return the current value for a time counter, or -1 if can't be retrieved.
*/
private long getTimestampMetric(long now, long value) {
- //TODO Ismael improve doc
+ // timestamp metrics are used to monitor time of execution of transforms.
+ // If result timestamp metric is too far from now, consider that metric is erroneous
+
if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
return -1;
}
@@ -437,16 +438,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
*/
private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
builder.build(options);
-// throw new UnsupportedOperationException(
-// "Cannot use --pubSubMode=COMBINED with DirectRunner");
- }
-
- /**
- * If monitoring, wait until the publisher pipeline has run long enough to establish
- * a backlog on the Pubsub topic. Otherwise, return immediately.
- */
- private void waitForPublisherPreload() {
- throw new UnsupportedOperationException();
}
/**
@@ -606,11 +597,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
publisherJob.waitUntilFinish(Duration.standardMinutes(5));
} catch (IOException e) {
throw new RuntimeException("Unable to cancel publisher job: ", e);
- } //TODO Ismael
-// catch (InterruptedException e) {
-// Thread.interrupted();
-// throw new RuntimeException("Interrupted: publish job still running.", e);
-// }
+ }
}
return perf;
@@ -755,7 +742,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
c.output(event);
} catch (CoderException e) {
- // TODO Log decoding Event error
+ LOG.error("Error while decoding Event from pusbSub message: serialization error");
}
}
}));
@@ -798,7 +785,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
c.output(new PubsubMessage(payload, new HashMap<String, String>()));
} catch (CoderException e1) {
- // TODO Log encoding Event error
+ LOG.error("Error while sending Event {} to pusbSub: serialization error",
+ c.element().toString());
}
}
})
@@ -1130,7 +1118,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
sinkEventsToAvro(source);
}
- // Special hacks for Query 10 (big logger).
+ // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs,
+ // so, set parallelism. Also set the output path where to write log files.
if (configuration.query == 10) {
String path = null;
if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
@@ -1158,9 +1147,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
sink(results, now);
}
- if (publisherResult != null) {
- waitForPublisherPreload();
- }
mainResult = p.run();
mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
return monitor(query);
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 9afffaa..fbd3e74 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -119,7 +118,7 @@ public interface NexmarkOptions
@Nullable
Integer getStreamTimeout();
- void setStreamTimeout(Integer preloadSeconds);
+ void setStreamTimeout(Integer streamTimeout);
@Description("Number of unbounded sources to create events.")
@Nullable
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
index be7d7b8..0d98a5d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
@@ -57,7 +57,7 @@ public enum NexmarkSuite {
private static List<NexmarkConfiguration> smoke() {
List<NexmarkConfiguration> configurations = new ArrayList<>();
for (int query = 0; query <= 12; query++) {
- NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.clone();
+ NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
configuration.query = query;
configuration.numEvents = 100_000;
if (query == 4 || query == 6 || query == 9) {
@@ -103,7 +103,7 @@ public enum NexmarkSuite {
public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
Set<NexmarkConfiguration> results = new LinkedHashSet<>();
for (NexmarkConfiguration configuration : configurations) {
- NexmarkConfiguration result = configuration.clone();
+ NexmarkConfiguration result = configuration.copy();
result.overrideFromOptions(options);
results.add(result);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index d813833..0e1672e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -23,55 +23,65 @@ import java.io.OutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
/**
- * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
- * or a {@link Bid}.
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
+ * {@link Bid}.
*/
public class Event implements KnownSize, Serializable {
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+ private enum Tag {
+ PERSON(0),
+ AUCTION(1),
+ BID(2);
- public static final Coder<Event> CODER = new CustomCoder<Event>() {
- @Override
- public void encode(Event value, OutputStream outStream)
- throws CoderException, IOException {
- if (value.newPerson != null) {
- INT_CODER.encode(0, outStream);
- Person.CODER.encode(value.newPerson, outStream);
- } else if (value.newAuction != null) {
- INT_CODER.encode(1, outStream);
- Auction.CODER.encode(value.newAuction, outStream);
- } else if (value.bid != null) {
- INT_CODER.encode(2, outStream);
- Bid.CODER.encode(value.bid, outStream);
- } else {
- throw new RuntimeException("invalid event");
- }
- }
+ private int value = -1;
- @Override
- public Event decode(
- InputStream inStream)
- throws CoderException, IOException {
- int tag = INT_CODER.decode(inStream);
- if (tag == 0) {
- Person person = Person.CODER.decode(inStream);
- return new Event(person);
- } else if (tag == 1) {
- Auction auction = Auction.CODER.decode(inStream);
- return new Event(auction);
- } else if (tag == 2) {
- Bid bid = Bid.CODER.decode(inStream);
- return new Event(bid);
- } else {
- throw new RuntimeException("invalid event encoding");
- }
+ Tag(int value){
+ this.value = value;
}
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
+ }
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ public static final Coder<Event> CODER =
+ new CustomCoder<Event>() {
+ @Override
+ public void encode(Event value, OutputStream outStream) throws IOException {
+ if (value.newPerson != null) {
+ INT_CODER.encode(Tag.PERSON.value, outStream);
+ Person.CODER.encode(value.newPerson, outStream);
+ } else if (value.newAuction != null) {
+ INT_CODER.encode(Tag.AUCTION.value, outStream);
+ Auction.CODER.encode(value.newAuction, outStream);
+ } else if (value.bid != null) {
+ INT_CODER.encode(Tag.BID.value, outStream);
+ Bid.CODER.encode(value.bid, outStream);
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public Event decode(InputStream inStream) throws IOException {
+ int tag = INT_CODER.decode(inStream);
+ if (tag == Tag.PERSON.value) {
+ Person person = Person.CODER.decode(inStream);
+ return new Event(person);
+ } else if (tag == Tag.AUCTION.value) {
+ Auction auction = Auction.CODER.decode(inStream);
+ return new Event(auction);
+ } else if (tag == Tag.BID.value) {
+ Bid bid = Bid.CODER.decode(inStream);
+ return new Event(bid);
+ } else {
+ throw new RuntimeException("invalid event encoding");
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+ };
@Nullable
@org.apache.avro.reflect.Nullable
@@ -111,10 +121,7 @@ public class Event implements KnownSize, Serializable {
this.bid = bid;
}
- /**
- * Return a copy of event which captures {@code annotation}.
- * (Used for debugging).
- */
+ /** Return a copy of event which captures {@code annotation}. (Used for debugging). */
public Event withAnnotation(String annotation) {
if (newPerson != null) {
return new Event(newPerson.withAnnotation(annotation));
@@ -125,9 +132,7 @@ public class Event implements KnownSize, Serializable {
}
}
- /**
- * Does event have {@code annotation}? (Used for debugging.)
- */
+ /** Does event have {@code annotation}? (Used for debugging.) */
public boolean hasAnnotation(String annotation) {
if (newPerson != null) {
return newPerson.hasAnnotation(annotation);
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 8e65591..e2522b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -45,7 +45,6 @@ public class Query0Model extends NexmarkQueryModel {
return;
}
addResult(timestampedEvent);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 5d4de45..f07db80 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -57,7 +57,6 @@ public class Query1Model extends NexmarkQueryModel implements Serializable {
TimestampedValue<Bid> result =
TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
addResult(result);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index f74b78d..f2b66d7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -29,13 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
@@ -243,9 +243,9 @@ public class Query3 extends NexmarkQuery {
theNewPerson = newPerson;
} else {
if (theNewPerson.equals(newPerson)) {
- LOG.error("**** duplicate person {} ****", theNewPerson);
+ LOG.error("Duplicate person {}", theNewPerson);
} else {
- LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson);
+ LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson);
}
fatalCounter.inc();
continue;
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 1944330..bdf3e5f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -63,56 +63,64 @@ public class Query5 extends NexmarkQuery {
// Only want the bid events.
.apply(JUST_BIDS)
// Window the bids into sliding windows.
- .apply(Window.<Bid>into(
- SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
- .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+ .apply(
+ Window.<Bid>into(
+ SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+ .every(Duration.standardSeconds(configuration.windowPeriodSec))))
// Project just the auction id.
.apply("BidToAuction", BID_TO_AUCTION)
// Count the number of bids per auction id.
.apply(Count.<Long>perElement())
- // We'll want to keep all auctions with the maximal number of bids.
+ // We'll want to keep all auctions with the maximal number of bids.
// Start by lifting each into a singleton list.
// need to do so because bellow combine returns a list of auctions in the key in case of
// equal number of bids. Combine needs to have same input type and return type.
- .apply(name + ".ToSingletons",
- ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
+ .apply(
+ name + ".ToSingletons",
+ ParDo.of(
+ new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) {
- c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue()));
+ c.output(
+ KV.of(
+ Collections.singletonList(c.element().getKey()),
+ c.element().getValue()));
}
}))
// Keep only the auction ids with the most bids.
.apply(
- Combine
- .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
- @Override
- public KV<List<Long>, Long> apply(
- KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
- List<Long> leftBestAuctions = left.getKey();
- long leftCount = left.getValue();
- List<Long> rightBestAuctions = right.getKey();
- long rightCount = right.getValue();
- if (leftCount > rightCount) {
- return left;
- } else if (leftCount < rightCount) {
- return right;
- } else {
- List<Long> newBestAuctions = new ArrayList<>();
- newBestAuctions.addAll(leftBestAuctions);
- newBestAuctions.addAll(rightBestAuctions);
- return KV.of(newBestAuctions, leftCount);
- }
- }
- })
+ Combine.globally(
+ new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
+ @Override
+ public KV<List<Long>, Long> apply(
+ KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
+ List<Long> leftBestAuctions = left.getKey();
+ long leftCount = left.getValue();
+ List<Long> rightBestAuctions = right.getKey();
+ long rightCount = right.getValue();
+ if (leftCount > rightCount) {
+ return left;
+ } else if (leftCount < rightCount) {
+ return right;
+ } else {
+ List<Long> newBestAuctions = new ArrayList<>();
+ newBestAuctions.addAll(leftBestAuctions);
+ newBestAuctions.addAll(rightBestAuctions);
+ return KV.of(newBestAuctions, leftCount);
+ }
+ }
+ })
.withoutDefaults()
.withFanout(configuration.fanout))
// Project into result.
- .apply(name + ".Select",
- ParDo.of(new DoFn<KV<List<Long>, Long>, AuctionCount>() {
+ .apply(
+ name + ".Select",
+ ParDo.of(
+ new DoFn<KV<List<Long>, Long>, AuctionCount>() {
@ProcessElement
public void processElement(ProcessContext c) {
long count = c.element().getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index 2a94ca9..217d0d4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -63,7 +63,7 @@ public class Query7 extends NexmarkQuery {
// requires an additional scan per window, with the associated cost of snapshotted state and
// its I/O. We'll keep this implementation since it illustrates the use of side inputs.
final PCollectionView<Long> maxPriceView =
- slidingBids //
+ slidingBids
.apply("BidToPrice", BID_TO_PRICE)
.apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
index 5c039f9..0ada5e8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
@@ -111,7 +111,6 @@ public class Query7Model extends NexmarkQueryModel implements Serializable {
}
// Keep only the highest bids.
captureBid(event.bid);
- //TODO test fails because offset of some hundreds of ms between expect and actual
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index bd6c2ed..d4ca177 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -139,6 +139,24 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
start(), end(), auction, isAuctionWindow);
}
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AuctionOrBidWindow that = (AuctionOrBidWindow) o;
+ return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(isAuctionWindow, auction);
+ }
}
/**
@@ -374,4 +392,21 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
}
));
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(auctionOrBidWindowFn);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ WinningBids that = (WinningBids) o;
+ return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
index 7d74f8f..9624a9d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
@@ -181,7 +181,6 @@ public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
return;
}
addResult(result);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
return;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 4f548cd..f6deceb 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Bid;
@@ -167,7 +168,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
}
/**
- * Return a deep clone of next event with delay added to wallclock timestamp and
+ * Return a deep copy of next event with delay added to wallclock timestamp and
* event annotate as 'LATE'.
*/
public NextEvent withDelay(long delayMs) {
@@ -175,6 +176,26 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
}
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextEvent nextEvent = (NextEvent) o;
+
+ return (wallclockTimestamp == nextEvent.wallclockTimestamp
+ && eventTimestamp == nextEvent.eventTimestamp
+ && watermark == nextEvent.watermark
+ && event.equals(nextEvent.event));
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+ }
+
@Override
public int compareTo(NextEvent other) {
int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
@@ -221,11 +242,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
}
/**
- * Return a deep clone of this generator.
+ * Return a deep copy of this generator.
*/
- @Override
- public Generator clone() {
- return new Generator(config.clone(), numEvents, wallclockBaseTime);
+ public Generator copy() {
+ checkNotNull(config);
+ Generator result = new Generator(config, numEvents, wallclockBaseTime);
+ return result;
}
/**
@@ -243,9 +265,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
*/
public GeneratorConfig splitAtEventId(long eventId) {
long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
- GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
+ GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
- config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+ config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
return remainConfig;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
index 5799bb2..95c276b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.integration.nexmark.sources;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.values.KV;
* Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
*/
public class GeneratorConfig implements Serializable {
+
/**
* We start the ids at specific values to help ensure the queries find a match even on
* small synthesized dataset sizes.
@@ -132,18 +135,13 @@ public class GeneratorConfig implements Serializable {
}
/**
- * Return a clone of this config.
- */
- @Override
- public GeneratorConfig clone() {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
- }
-
- /**
- * Return clone of this config except with given parameters.
+ * Return a copy of this config.
*/
- public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
- return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ public GeneratorConfig copy() {
+ GeneratorConfig result;
+ result = new GeneratorConfig(configuration, baseTime, firstEventId,
+ maxEvents, firstEventNumber);
+ return result;
}
/**
@@ -164,7 +162,7 @@ public class GeneratorConfig implements Serializable {
// Don't loose any events to round-down.
subMaxEvents = maxEvents - subMaxEvents * (n - 1);
}
- results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
+ results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
subFirstEventId += subMaxEvents;
}
}
@@ -172,6 +170,13 @@ public class GeneratorConfig implements Serializable {
}
/**
+ * Return copy of this config except with given parameters.
+ */
+ public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+ return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+ }
+
+ /**
* Return an estimate of the bytes needed by {@code numEvents}.
*/
public long estimatedBytesForEvents(long numEvents) {
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index b005d65..64a8e4f 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -37,7 +37,7 @@ import org.junit.runners.JUnit4;
/** Test the various NEXMark queries yield results coherent with their models. */
@RunWith(JUnit4.class)
public class QueryTest {
- private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
+ private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy();
static {
// careful, results of tests are linked to numEventGenerators because of timestamp generation
@@ -55,12 +55,8 @@ public class QueryTest {
if (streamingMode) {
results =
p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
- //TODO Ismael this should not be called explicitly
- results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
} else {
results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
- //TODO Ismael this should not be called explicitly
- results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
}
PAssert.that(results).satisfies(model.assertionFor());
PipelineResult result = p.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
index 1d04e2a..1ecc33e 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
@@ -28,7 +28,6 @@ import java.util.Set;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -95,12 +94,11 @@ public class UnboundedEventSourceTest {
while (n > 0) {
int m = Math.min(459 + random.nextInt(455), n);
- System.out.printf("reading %d...\n", m);
+ System.out.printf("reading %d...%n", m);
checker.add(m, reader, modelGenerator);
n -= m;
- System.out.printf("splitting with %d remaining...\n", n);
+ System.out.printf("splitting with %d remaining...%n", n);
CheckpointMark checkpointMark = reader.getCheckpointMark();
- assertTrue(checkpointMark instanceof Generator.Checkpoint);
reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
}