You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:39 UTC
[31/55] [abbrv] beam git commit: Fix static analysis issues
Fix static analysis issues
Restrict access level on classes + other static analysis fixes
Fix findbugs issues (issue #33)
Fix compile after AvroIO, TextIO, PubsubIO and State refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1541fad0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1541fad0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1541fad0
Branch: refs/heads/master
Commit: 1541fad077e47df1d47636fd186a72aa827bbc42
Parents: a39cb80
Author: Ismaël Mejía <ie...@apache.org>
Authored: Mon May 1 00:54:08 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 2 +-
.../beam/integration/nexmark/Monitor.java | 4 +-
.../beam/integration/nexmark/NexmarkDriver.java | 12 +-
.../beam/integration/nexmark/NexmarkRunner.java | 124 +++++++++++--------
.../beam/integration/nexmark/NexmarkUtils.java | 34 +++--
.../beam/integration/nexmark/model/Auction.java | 8 +-
.../integration/nexmark/model/AuctionCount.java | 6 +-
.../integration/nexmark/model/AuctionPrice.java | 4 +-
.../nexmark/model/BidsPerSession.java | 4 +-
.../beam/integration/nexmark/model/Done.java | 2 +-
.../beam/integration/nexmark/model/Event.java | 13 --
.../nexmark/model/IdNameReserve.java | 6 +-
.../nexmark/model/NameCityStateId.java | 8 +-
.../beam/integration/nexmark/model/Person.java | 6 +-
.../integration/nexmark/model/SellerPrice.java | 2 +-
.../nexmark/queries/AbstractSimulator.java | 10 +-
.../nexmark/queries/NexmarkQuery.java | 34 ++---
.../nexmark/queries/NexmarkQueryModel.java | 17 +--
.../nexmark/queries/Query0Model.java | 2 +-
.../integration/nexmark/queries/Query10.java | 6 +-
.../integration/nexmark/queries/Query11.java | 3 +-
.../nexmark/queries/Query1Model.java | 2 +-
.../integration/nexmark/queries/Query3.java | 24 ++--
.../nexmark/queries/Query3Model.java | 2 +-
.../nexmark/queries/Query4Model.java | 5 +-
.../integration/nexmark/queries/Query5.java | 4 +-
.../integration/nexmark/queries/Query6.java | 4 +-
.../nexmark/queries/Query6Model.java | 5 +-
.../nexmark/queries/WinningBids.java | 30 +++--
.../integration/nexmark/sources/Generator.java | 11 +-
.../nexmark/sources/GeneratorConfig.java | 26 ++--
.../nexmark/sources/UnboundedEventSource.java | 2 +-
.../sources/UnboundedEventSourceTest.java | 5 +-
integration/pom.xml | 14 +++
34 files changed, 221 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index fb213e9..8a65c0f 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -210,7 +210,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index cb4d71c..2f0c56a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -63,8 +63,8 @@ public class Monitor<T extends KnownSize> implements Serializable {
public final String name;
public final String prefix;
- final MonitorDoFn doFn;
- final PTransform<PCollection<? extends T>, PCollection<T>> transform;
+ private final MonitorDoFn doFn;
+ private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
public Monitor(String name, String prefix) {
this.name = name;
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
index 7d532cc..a982a8d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java
@@ -57,7 +57,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
/**
* Entry point.
*/
- public void runAll(OptionT options, NexmarkRunner runner) {
+ void runAll(OptionT options, NexmarkRunner runner) {
Instant start = Instant.now();
Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
@@ -87,7 +87,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
}
if (!successful) {
- System.exit(1);
+ throw new RuntimeException("Execution was not successful");
}
}
@@ -149,8 +149,6 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
/**
* Print summary of {@code actual} vs (if non-null) {@code baseline}.
- *
- * @throws IOException
*/
private static void saveSummary(
@Nullable String summaryFilename,
@@ -227,7 +225,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
if (actualPerf != null) {
List<String> errors = actualPerf.errors;
if (errors == null) {
- errors = new ArrayList<String>();
+ errors = new ArrayList<>();
errors.add("NexmarkGoogleRunner returned null errors list");
}
for (String error : errors) {
@@ -300,7 +298,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> {
NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(NexmarkOptions.class);
- NexmarkRunner runner = new NexmarkRunner(options);
- new NexmarkDriver().runAll(options, runner);
+ NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options);
+ new NexmarkDriver<>().runAll(options, runner);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index a3c4d33..6df76f0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
@@ -65,10 +66,12 @@ import org.apache.beam.integration.nexmark.queries.Query9;
import org.apache.beam.integration.nexmark.queries.Query9Model;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -77,6 +80,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -91,15 +95,15 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Minimum number of samples needed for 'stead-state' rate calculation.
*/
- protected static final int MIN_SAMPLES = 9;
+ private static final int MIN_SAMPLES = 9;
/**
* Minimum length of time over which to consider samples for 'steady-state' rate calculation.
*/
- protected static final Duration MIN_WINDOW = Duration.standardMinutes(2);
+ private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
/**
* Delay between perf samples.
*/
- protected static final Duration PERF_DELAY = Duration.standardSeconds(15);
+ private static final Duration PERF_DELAY = Duration.standardSeconds(15);
/**
* How long to let streaming pipeline run after all events have been generated and we've
* seen no activity.
@@ -117,37 +121,37 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* NexmarkOptions shared by all runs.
*/
- protected final OptionT options;
+ private final OptionT options;
/**
* Which configuration we are running.
*/
@Nullable
- protected NexmarkConfiguration configuration;
+ private NexmarkConfiguration configuration;
/**
* If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null.
*/
@Nullable
- protected Monitor<Event> publisherMonitor;
+ private Monitor<Event> publisherMonitor;
/**
* If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
*/
@Nullable
- protected PipelineResult publisherResult;
+ private PipelineResult publisherResult;
/**
* Result for the main pipeline.
*/
@Nullable
- protected PipelineResult mainResult;
+ private PipelineResult mainResult;
/**
* Query name we are running.
*/
@Nullable
- protected String queryName;
+ private String queryName;
public NexmarkRunner(OptionT options) {
this.options = options;
@@ -160,7 +164,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Is this query running in streaming mode?
*/
- protected boolean isStreaming() {
+ private boolean isStreaming() {
return options.isStreaming();
}
@@ -174,7 +178,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Return maximum number of workers.
*/
- protected int maxNumWorkers() {
+ private int maxNumWorkers() {
return 5;
}
@@ -182,7 +186,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
* Return the current value for a long counter, or a default value if can't be retrieved.
* Note this uses only attempted metrics because some runners don't support committed metrics.
*/
- protected long getCounterMetric(PipelineResult result, String namespace, String name,
+ private long getCounterMetric(PipelineResult result, String namespace, String name,
long defaultValue) {
//TODO Ismael calc this only once
MetricQueryResults metrics = result.metrics().queryMetrics(
@@ -201,7 +205,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
* Return the current value for a long counter, or a default value if can't be retrieved.
* Note this uses only attempted metrics because some runners don't support committed metrics.
*/
- protected long getDistributionMetric(PipelineResult result, String namespace, String name,
+ private long getDistributionMetric(PipelineResult result, String namespace, String name,
DistributionType distType, long defaultValue) {
MetricQueryResults metrics = result.metrics().queryMetrics(
MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
@@ -226,7 +230,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Return the current value for a time counter, or -1 if can't be retrieved.
*/
- protected long getTimestampMetric(long now, long value) {
+ private long getTimestampMetric(long now, long value) {
//TODO Ismael improve doc
if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
return -1;
@@ -238,8 +242,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
* Find a 'steady state' events/sec from {@code snapshots} and
* store it in {@code perf} if found.
*/
- protected void captureSteadyState(NexmarkPerf perf,
- List<NexmarkPerf.ProgressSnapshot> snapshots) {
+ private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
if (!options.isStreaming()) {
return;
}
@@ -426,7 +429,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
return perf;
}
- protected String getJobId(PipelineResult job) {
+ private String getJobId(PipelineResult job) {
return "";
}
@@ -528,15 +531,14 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Build and run a pipeline using specified options.
*/
- protected interface PipelineBuilder<OptionT extends NexmarkOptions> {
+ interface PipelineBuilder<OptionT extends NexmarkOptions> {
void build(OptionT publishOnlyOptions);
}
/**
* Invoke the builder with options suitable for running a publish-only child pipeline.
*/
- protected void invokeBuilderForPublishOnlyPipeline(
- PipelineBuilder builder) {
+ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
builder.build(options);
// throw new UnsupportedOperationException(
// "Cannot use --pubSubMode=COMBINED with DirectRunner");
@@ -546,7 +548,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
* If monitoring, wait until the publisher pipeline has run long enough to establish
* a backlog on the Pubsub topic. Otherwise, return immediately.
*/
- protected void waitForPublisherPreload() {
+ private void waitForPublisherPreload() {
throw new UnsupportedOperationException();
}
@@ -555,7 +557,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
* it was measured.
*/
@Nullable
- protected NexmarkPerf monitor(NexmarkQuery query) {
+ private NexmarkPerf monitor(NexmarkQuery query) {
if (!options.getMonitorJobs()) {
return null;
}
@@ -841,14 +843,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
String shortSubscription = shortSubscription(now);
NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
- PubsubIO.Read<Event> io =
- PubsubIO.<Event>read().fromSubscription(shortSubscription)
- .withIdAttribute(NexmarkUtils.PUBSUB_ID)
- .withCoder(Event.CODER);
+
+ PubsubIO.Read<PubsubMessage> io =
+ PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
- return p.apply(queryName + ".ReadPubsubEvents", io);
+
+ return p
+ .apply(queryName + ".ReadPubsubEvents", io)
+ .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] payload = c.element().getPayload();
+ try {
+ Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
+ c.output(event);
+ } catch (CoderException e) {
+ // TODO Log decoding Event error
+ }
+ }
+ }));
}
/**
@@ -861,9 +877,8 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
}
NexmarkUtils.console("Reading events from Avro files at %s", filename);
return p
- .apply(queryName + ".ReadAvroEvents", AvroIO.Read
- .from(filename + "*.avro")
- .withSchema(Event.class))
+ .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class)
+ .from(filename + "*.avro"))
.apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
}
@@ -873,14 +888,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
private void sinkEventsToPubsub(PCollection<Event> events, long now) {
String shortTopic = shortTopic(now);
NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
- PubsubIO.Write<Event> io =
- PubsubIO.<Event>write().to(shortTopic)
- .withIdAttribute(NexmarkUtils.PUBSUB_ID)
- .withCoder(Event.CODER);
+
+ PubsubIO.Write<PubsubMessage> io =
+ PubsubIO.writePubsubMessages().to(shortTopic)
+ .withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
- events.apply(queryName + ".WritePubsubEvents", io);
+
+ events.apply(queryName + ".EventToPubsubMessage",
+ ParDo.of(new DoFn<Event, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+ c.output(new PubsubMessage(payload, new HashMap<String, String>()));
+ } catch (CoderException e1) {
+ // TODO Log encoding Event error
+ }
+ }
+ })
+ )
+ .apply(queryName + ".WritePubsubEvents", io);
}
/**
@@ -890,7 +919,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
String shortTopic = shortTopic(now);
NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
PubsubIO.Write<String> io =
- PubsubIO.<String>write().to(shortTopic)
+ PubsubIO.writeStrings().to(shortTopic)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -917,18 +946,16 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
}
NexmarkUtils.console("Writing events to Avro files at %s", filename);
source.apply(queryName + ".WriteAvroEvents",
- AvroIO.Write.to(filename + "/event").withSuffix(".avro").withSchema(Event.class));
+ AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
source.apply(NexmarkQuery.JUST_BIDS)
.apply(queryName + ".WriteAvroBids",
- AvroIO.Write.to(filename + "/bid").withSuffix(".avro").withSchema(Bid.class));
+ AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
.apply(queryName + ".WriteAvroAuctions",
- AvroIO.Write.to(filename + "/auction").withSuffix(".avro")
- .withSchema(Auction.class));
+ AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
source.apply(NexmarkQuery.JUST_NEW_PERSONS)
.apply(queryName + ".WriteAvroPeople",
- AvroIO.Write.to(filename + "/person").withSuffix(".avro")
- .withSchema(Person.class));
+ AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
}
/**
@@ -938,7 +965,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
String filename = textFilename(now);
NexmarkUtils.console("Writing results to text files at %s", filename);
formattedResults.apply(queryName + ".WriteTextResults",
- TextIO.Write.to(filename));
+ TextIO.write().to(filename));
}
private static class StringToTableRow extends DoFn<String, TableRow> {
@@ -1010,12 +1037,12 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
// Send synthesized events to Pubsub in separate publisher job.
// We won't start the main pipeline until the publisher has sent the pre-load events.
// We'll shutdown the publisher job when we notice the main job has finished.
- invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
+ invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() {
@Override
public void build(NexmarkOptions publishOnlyOptions) {
Pipeline sp = Pipeline.create(options);
NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
- publisherMonitor = new Monitor<Event>(queryName, "publisher");
+ publisherMonitor = new Monitor<>(queryName, "publisher");
sinkEventsToPubsub(
sourceEventsFromSynthetic(sp)
.apply(queryName + ".Monitor", publisherMonitor.getTransform()),
@@ -1140,9 +1167,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
checkState(queryName == null);
configuration = runConfiguration;
- // GCS URI patterns to delete on exit.
- List<String> pathsToDelete = new ArrayList<>();
-
try {
NexmarkUtils.console("Running %s", configuration.toShortString());
@@ -1220,9 +1244,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
}
((Query10) query).setOutputPath(path);
((Query10) query).setMaxNumWorkers(maxNumWorkers());
- if (path != null && options.getManageResources()) {
- pathsToDelete.add(path + "/**");
- }
}
// Apply query.
@@ -1252,7 +1273,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
} finally {
configuration = null;
queryName = null;
- // TODO: Cleanup pathsToDelete
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 18589c4..f6215e9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -55,6 +55,9 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -63,9 +66,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -178,7 +178,7 @@ public class NexmarkUtils {
/** Names are suffixed with the query being run. */
QUERY,
/** Names are suffixed with the query being run and a random number. */
- QUERY_AND_SALT;
+ QUERY_AND_SALT
}
/**
@@ -310,7 +310,7 @@ public class NexmarkUtils {
* Log message to console. For client side only.
*/
public static void console(String format, Object... args) {
- System.out.printf("%s %s\n", Instant.now(), String.format(format, args));
+ System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
}
/**
@@ -326,7 +326,7 @@ public class NexmarkUtils {
/**
* All events will be given a timestamp relative to this time (ms since epoch).
*/
- public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
+ private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
/**
* Instants guaranteed to be strictly before and after all event timestamps, and which won't
@@ -377,7 +377,7 @@ public class NexmarkUtils {
/**
* Return a generator config to match the given {@code options}.
*/
- public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
+ private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
return new GeneratorConfig(configuration,
configuration.useWallclockEventTime ? System.currentTimeMillis()
: BASE_TIME, 0,
@@ -558,15 +558,14 @@ public class NexmarkUtils {
}
p++;
}
- long next = System.currentTimeMillis();
- now = next;
+ now = System.currentTimeMillis();
}
c.output(c.element());
}
});
}
- private static final StateSpec<Object, ValueState<byte[]>> DUMMY_TAG =
+ private static final StateSpec<ValueState<byte[]>> DUMMY_TAG =
StateSpecs.value(ByteArrayCoder.of());
private static final int MAX_BUFFER_SIZE = 1 << 24;
@@ -578,20 +577,19 @@ public class NexmarkUtils {
@ProcessElement
public void processElement(ProcessContext c) {
long remain = bytes;
- long start = System.currentTimeMillis();
- long now = start;
+// long now = System.currentTimeMillis();
while (remain > 0) {
+ //TODO Ismael google on state
long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
remain -= thisBytes;
- byte[] arr = new byte[(int) thisBytes];
- for (int i = 0; i < thisBytes; i++) {
- arr[i] = (byte) now;
- }
- //TODO Ismael google on state
+// byte[] arr = new byte[(int) thisBytes];
+// for (int i = 0; i < thisBytes; i++) {
+// arr[i] = (byte) now;
+// }
// ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
// StateNamespaces.global(), DUMMY_TAG);
// state.write(arr);
- now = System.currentTimeMillis();
+// now = System.currentTimeMillis();
}
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index 4b1a848..5c018dc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -81,14 +81,14 @@ public class Auction implements KnownSize, Serializable {
/** Extra auction properties. */
@JsonProperty
- public final String itemName;
+ private final String itemName;
@JsonProperty
- public final String description;
+ private final String description;
/** Initial bid price, in cents. */
@JsonProperty
- public final long initialBid;
+ private final long initialBid;
/** Reserve price, in cents. */
@JsonProperty
@@ -111,7 +111,7 @@ public class Auction implements KnownSize, Serializable {
/** Additional arbitrary payload for performance testing. */
@JsonProperty
- public final String extra;
+ private final String extra;
// For Avro only.
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index e6d3450..c83a455 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -54,11 +54,9 @@ public class AuctionCount implements KnownSize, Serializable {
}
};
- @JsonProperty
- public final long auction;
+ @JsonProperty private final long auction;
- @JsonProperty
- public final long count;
+ @JsonProperty private final long count;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index cb971e2..43d0b27 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -55,11 +55,11 @@ public class AuctionPrice implements KnownSize, Serializable {
};
@JsonProperty
- public final long auction;
+ private final long auction;
/** Price in cents. */
@JsonProperty
- public final long price;
+ private final long price;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 26b6a41..6dddf34 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -56,10 +56,10 @@ public class BidsPerSession implements KnownSize, Serializable {
};
@JsonProperty
- public final long personId;
+ private final long personId;
@JsonProperty
- public final long bidsPerSession;
+ private final long bidsPerSession;
public BidsPerSession() {
personId = 0;
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 42999cd..0c14e8f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -54,7 +54,7 @@ public class Done implements KnownSize, Serializable {
};
@JsonProperty
- public final String message;
+ private final String message;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index e2130c9..1f1f096 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -138,19 +138,6 @@ public class Event implements KnownSize, Serializable {
}
}
- /**
- * Remove {@code annotation} from event. (Used for debugging.)
- */
- public Event withoutAnnotation(String annotation) {
- if (newPerson != null) {
- return new Event(newPerson.withoutAnnotation(annotation));
- } else if (newAuction != null) {
- return new Event(newAuction.withoutAnnotation(annotation));
- } else {
- return new Event(bid.withoutAnnotation(annotation));
- }
- }
-
@Override
public long sizeInBytes() {
if (newPerson != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index cf1e571..17b8c4a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -60,14 +60,14 @@ public class IdNameReserve implements KnownSize, Serializable {
};
@JsonProperty
- public final long id;
+ private final long id;
@JsonProperty
- public final String name;
+ private final String name;
/** Reserve price in cents. */
@JsonProperty
- public final long reserve;
+ private final long reserve;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index 86d1738..28f25cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -62,16 +62,16 @@ public class NameCityStateId implements KnownSize, Serializable {
};
@JsonProperty
- public final String name;
+ private final String name;
@JsonProperty
- public final String city;
+ private final String city;
@JsonProperty
- public final String state;
+ private final String state;
@JsonProperty
- public final long id;
+ private final long id;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index 906df94..c690fd4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -77,10 +77,10 @@ public class Person implements KnownSize, Serializable {
public final String name;
@JsonProperty
- public final String emailAddress;
+ private final String emailAddress;
@JsonProperty
- public final String creditCard;
+ private final String creditCard;
@JsonProperty
public final String city;
@@ -93,7 +93,7 @@ public class Person implements KnownSize, Serializable {
/** Additional arbitrary payload for performance testing. */
@JsonProperty
- public final String extra;
+ private final String extra;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index 68f2697..52ff540 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -60,7 +60,7 @@ public class SellerPrice implements KnownSize, Serializable {
/** Price in cents. */
@JsonProperty
- public final long price;
+ private final long price;
// For Avro only.
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
index 270b5c3..1395182 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
@@ -37,7 +37,7 @@ import org.joda.time.Instant;
*/
public abstract class AbstractSimulator<InputT, OutputT> {
/** Window size for action bucket sampling. */
- public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+ private static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
/** Input event stream we should draw from. */
private final Iterator<TimestampedValue<InputT>> input;
@@ -77,7 +77,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
/** Called by implementors of {@link #run}: Fetch the next input element. */
@Nullable
- protected TimestampedValue<InputT> nextInput() {
+ TimestampedValue<InputT> nextInput() {
if (!input.hasNext()) {
return null;
}
@@ -90,7 +90,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
* Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of
* recording the expected activity of the query over time.
*/
- protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+ void addIntermediateResult(TimestampedValue<OutputT> result) {
NexmarkUtils.info("intermediate result: %s", result);
updateCounts(result.getTimestamp());
}
@@ -99,7 +99,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
* Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
* semantic correctness.
*/
- protected void addResult(TimestampedValue<OutputT> result) {
+ void addResult(TimestampedValue<OutputT> result) {
NexmarkUtils.info("result: %s", result);
pendingResults.add(result);
updateCounts(result.getTimestamp());
@@ -121,7 +121,7 @@ public abstract class AbstractSimulator<InputT, OutputT> {
}
/** Called by implementors of {@link #run}: Record that no more results will be emitted. */
- protected void allDone() {
+ void allDone() {
isDone = true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
index 0796ce5..09415c0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -46,10 +46,10 @@ public abstract class NexmarkQuery
extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
- protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+ static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
/** Predicate to detect a new person event. */
- protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+ private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
new SerializableFunction<Event, Boolean>() {
@Override
public Boolean apply(Event event) {
@@ -58,7 +58,7 @@ public abstract class NexmarkQuery
};
/** DoFn to convert a new person event to a person. */
- protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+ private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().newPerson);
@@ -66,7 +66,7 @@ public abstract class NexmarkQuery
};
/** Predicate to detect a new auction event. */
- protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+ private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
new SerializableFunction<Event, Boolean>() {
@Override
public Boolean apply(Event event) {
@@ -75,7 +75,7 @@ public abstract class NexmarkQuery
};
/** DoFn to convert a new auction event to an auction. */
- protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+ private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().newAuction);
@@ -83,7 +83,7 @@ public abstract class NexmarkQuery
};
/** Predicate to detect a new bid event. */
- protected static final SerializableFunction<Event, Boolean> IS_BID =
+ private static final SerializableFunction<Event, Boolean> IS_BID =
new SerializableFunction<Event, Boolean>() {
@Override
public Boolean apply(Event event) {
@@ -92,7 +92,7 @@ public abstract class NexmarkQuery
};
/** DoFn to convert a bid event to a bid. */
- protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+ private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().bid);
@@ -100,7 +100,7 @@ public abstract class NexmarkQuery
};
/** Transform to key each person by their id. */
- protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
+ static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
ParDo.of(new DoFn<Person, KV<Long, Person>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -109,7 +109,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each auction by its id. */
- protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+ static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -118,7 +118,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each auction by its seller id. */
- protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+ static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -127,7 +127,7 @@ public abstract class NexmarkQuery
});
/** Transform to key each bid by it's auction id. */
- protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+ static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -136,7 +136,7 @@ public abstract class NexmarkQuery
});
/** Transform to project the auction id from each bid. */
- protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+ static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -145,7 +145,7 @@ public abstract class NexmarkQuery
});
/** Transform to project the price from each bid. */
- protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+ static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
@@ -205,13 +205,13 @@ public abstract class NexmarkQuery
}
};
- protected final NexmarkConfiguration configuration;
+ final NexmarkConfiguration configuration;
public final Monitor<Event> eventMonitor;
public final Monitor<KnownSize> resultMonitor;
- public final Monitor<Event> endOfStreamMonitor;
- protected final Counter fatalCounter;
+ private final Monitor<Event> endOfStreamMonitor;
+ private final Counter fatalCounter;
- protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+ NexmarkQuery(NexmarkConfiguration configuration, String name) {
super(name);
this.configuration = configuration;
if (configuration.debug) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
index 1ad9099..bfa668b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
@@ -43,7 +43,7 @@ import org.junit.Assert;
public abstract class NexmarkQueryModel implements Serializable {
public final NexmarkConfiguration configuration;
- public NexmarkQueryModel(NexmarkConfiguration configuration) {
+ NexmarkQueryModel(NexmarkConfiguration configuration) {
this.configuration = configuration;
}
@@ -51,7 +51,7 @@ public abstract class NexmarkQueryModel implements Serializable {
* Return the start of the most recent window of {@code size} and {@code period} which ends
* strictly before {@code timestamp}.
*/
- public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+ static Instant windowStart(Duration size, Duration period, Instant timestamp) {
long ts = timestamp.getMillis();
long p = period.getMillis();
long lim = ts - ts % p;
@@ -60,7 +60,7 @@ public abstract class NexmarkQueryModel implements Serializable {
}
/** Convert {@code itr} to strings capturing values, timestamps and order. */
- protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+ static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
List<String> strings = new ArrayList<>();
while (itr.hasNext()) {
strings.add(itr.next().toString());
@@ -69,7 +69,7 @@ public abstract class NexmarkQueryModel implements Serializable {
}
/** Convert {@code itr} to strings capturing values and order. */
- protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+ static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
List<String> strings = new ArrayList<>();
while (itr.hasNext()) {
strings.add(itr.next().getValue().toString());
@@ -78,7 +78,7 @@ public abstract class NexmarkQueryModel implements Serializable {
}
/** Convert {@code itr} to strings capturing values only. */
- protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+ static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
Set<String> strings = new HashSet<>();
while (itr.hasNext()) {
strings.add(itr.next().getValue().toString());
@@ -90,7 +90,7 @@ public abstract class NexmarkQueryModel implements Serializable {
public abstract AbstractSimulator<?, ?> simulator();
/** Return sub-sequence of results which are significant for model. */
- protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+ Iterable<TimestampedValue<KnownSize>> relevantResults(
Iterable<TimestampedValue<KnownSize>> results) {
return results;
}
@@ -104,8 +104,6 @@ public abstract class NexmarkQueryModel implements Serializable {
/** Return assertion to use on results of pipeline for this query. */
public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
final Collection<String> expectedStrings = toCollection(simulator().results());
- final String[] expectedStringsArray =
- expectedStrings.toArray(new String[expectedStrings.size()]);
return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
@Override
@@ -113,9 +111,6 @@ public abstract class NexmarkQueryModel implements Serializable {
Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
Assert.assertThat("wrong pipeline output", actualStrings,
IsEqual.equalTo(expectedStrings));
-//compare without order
-// Assert.assertThat("wrong pipeline output", actualStrings,
-// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
return null;
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 6fb6613..8e65591 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -32,7 +32,7 @@ public class Query0Model extends NexmarkQueryModel {
/**
* Simulator for query 0.
*/
- private class Simulator extends AbstractSimulator<Event, Event> {
+ private static class Simulator extends AbstractSimulator<Event, Event> {
public Simulator(NexmarkConfiguration configuration) {
super(NexmarkUtils.standardEventIterator(configuration));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index c919691..516dab1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -35,7 +35,7 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
@@ -101,7 +101,7 @@ public class Query10 extends NexmarkQuery {
@Override
public String toString() {
- return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+ return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename);
}
}
@@ -130,8 +130,6 @@ public class Query10 extends NexmarkQuery {
/**
* Return channel for writing bytes to GCS.
- *
- * @throws IOException
*/
private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
index fd936a9..6db9bcf 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -63,14 +63,13 @@ public class Query11 extends NexmarkQuery {
Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)));
- PCollection<BidsPerSession> bidsPerSession = biddersWindowed.apply(Count.<Long>perElement())
+ return biddersWindowed.apply(Count.<Long>perElement())
.apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
@ProcessElement public void processElement(ProcessContext c) {
c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
}
}));
- return bidsPerSession;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 0388687..5d4de45 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -34,7 +34,7 @@ public class Query1Model extends NexmarkQueryModel implements Serializable {
/**
* Simulator for query 1.
*/
- private class Simulator extends AbstractSimulator<Event, Bid> {
+ private static class Simulator extends AbstractSimulator<Event, Bid> {
public Simulator(NexmarkConfiguration configuration) {
super(NexmarkUtils.standardEventIterator(configuration));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 71364ba..f74b78d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.integration.nexmark.queries;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
@@ -30,6 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
@@ -41,13 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -176,18 +175,18 @@ public class Query3 extends NexmarkQuery {
*/
private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
- private int maxAuctionsWaitingTime;
+ private final int maxAuctionsWaitingTime;
private static final String AUCTIONS = "auctions";
private static final String PERSON = "person";
@StateId(PERSON)
- private static final StateSpec<Object, ValueState<Person>> personSpec =
+ private static final StateSpec<ValueState<Person>> personSpec =
StateSpecs.value(Person.CODER);
private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
@StateId(AUCTIONS)
- private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
+ private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
StateSpecs.value(ListCoder.of(Auction.CODER));
@TimerId(PERSON_STATE_EXPIRING)
@@ -219,8 +218,7 @@ public class Query3 extends NexmarkQuery {
ProcessContext c,
@TimerId(PERSON_STATE_EXPIRING) Timer timer,
@StateId(PERSON) ValueState<Person> personState,
- @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState)
- throws IOException {
+ @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
// We would *almost* implement this by rewindowing into the global window and
// running a combiner over the result. The combiner's accumulator would be the
// state we use below. However, combiners cannot emit intermediate results, thus
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
index 6b98e2a..f415709 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
@@ -42,7 +42,7 @@ public class Query3Model extends NexmarkQueryModel implements Serializable {
/**
* Simulator for query 3.
*/
- private class Simulator extends AbstractSimulator<Event, NameCityStateId> {
+ private static class Simulator extends AbstractSimulator<Event, NameCityStateId> {
/** Auctions, indexed by seller id. */
private final Multimap<Long, Auction> newAuctions;
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
index 634a58e..269e47a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
@@ -93,8 +93,9 @@ public class Query4Model extends NexmarkQueryModel implements Serializable {
}
totals.put(category, total);
}
- for (long category : counts.keySet()) {
- long count = counts.get(category);
+ for (Map.Entry<Long, Long> entry : counts.entrySet()) {
+ long category = entry.getKey();
+ long count = entry.getValue();
long total = totals.get(category);
TimestampedValue<CategoryPrice> result = TimestampedValue.of(
new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 18ce578..1944330 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -18,7 +18,7 @@
package org.apache.beam.integration.nexmark.queries;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -80,7 +80,7 @@ public class Query5 extends NexmarkQuery {
ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) {
- c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue()));
+ c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue()));
}
}))
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
index 65789ab..ea39ede 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
@@ -86,9 +86,7 @@ public class Query6 extends NexmarkQuery {
public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
List<Bid> result = new ArrayList<>();
for (List<Bid> accumulator : accumulators) {
- for (Bid bid : accumulator) {
- result.add(bid);
- }
+ result.addAll(accumulator);
}
Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
if (result.size() > maxNumBids) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
index 0691714..9cb8b3d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
@@ -86,8 +86,9 @@ public class Query6Model extends NexmarkQueryModel implements Serializable {
protected void run() {
TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
if (timestampedWinningBid == null) {
- for (long seller : numWinningBidsPerSeller.keySet()) {
- long count = numWinningBidsPerSeller.get(seller);
+ for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) {
+ long seller = entry.getKey();
+ long count = entry.getValue();
long total = totalWinningBidPricesPerSeller.get(seller);
addResult(TimestampedValue.of(
new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index 11a4d38..52891a7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -25,8 +25,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -77,7 +77,7 @@ import org.joda.time.Instant;
*/
public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
/** Windows for open auctions and bids. */
- private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
+ private static class AuctionOrBidWindow extends IntervalWindow {
/** Id of auction this window is for. */
public final long auction;
@@ -104,9 +104,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
/** Return an auction window for {@code auction}. */
public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
- AuctionOrBidWindow result =
- new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
- return result;
+ return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
}
/**
@@ -127,9 +125,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
// Instead, we will just give the bid a finite window which expires at
// the upper bound of auctions assuming the auction starts at the same time as the bid,
// and assuming the system is running at its lowest event rate (as per interEventDelayUs).
- AuctionOrBidWindow result = new AuctionOrBidWindow(
+ return new AuctionOrBidWindow(
timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
- return result;
}
/** Is this an auction window? */
@@ -171,8 +168,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
throws IOException, CoderException {
IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
- boolean isAuctionWindow =
- INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
+ boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0;
return new AuctionOrBidWindow(
superWindow.start(), superWindow.end(), auction, isAuctionWindow);
}
@@ -194,15 +190,16 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
Event event = c.element();
if (event.newAuction != null) {
// Assign auctions to an auction window which expires at the auction's close.
- return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
+ return Collections
+ .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
} else if (event.bid != null) {
// Assign bids to a temporary bid window which will later be merged into the appropriate
// auction window.
- return Arrays.asList(
+ return Collections.singletonList(
AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
} else {
// Don't assign people to any window. They will thus be dropped.
- return Arrays.asList();
+ return Collections.emptyList();
}
}
@@ -226,8 +223,9 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
// Merge all 'bid' windows into their corresponding 'auction' window, provided the
// auction has not expired.
- for (long auction : idToTrueAuctionWindow.keySet()) {
- AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
+ for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) {
+ long auction = entry.getKey();
+ AuctionOrBidWindow auctionWindow = entry.getValue();
List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
if (bidWindows != null) {
List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
@@ -296,8 +294,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
configuration.firstEventRate, configuration.nextEventRate,
configuration.rateUnit, configuration.numEventGenerators);
long longestDelayUs = 0;
- for (int i = 0; i < interEventDelayUs.length; i++) {
- longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
+ for (long interEventDelayU : interEventDelayUs) {
+ longestDelayUs = Math.max(longestDelayUs, interEventDelayU);
}
// Adjust for proportion of auction events amongst all events.
longestDelayUs =
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 012d4e6..2a2732b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -123,8 +123,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
@Override public void verifyDeterministic() throws NonDeterministicException {}
};
- private long numEvents;
- private long wallclockBaseTime;
+ private final long numEvents;
+ private final long wallclockBaseTime;
private Checkpoint(long numEvents, long wallclockBaseTime) {
this.numEvents = numEvents;
@@ -403,8 +403,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
if (n < Integer.MAX_VALUE) {
return random.nextInt((int) n);
} else {
- // TODO: Very skewed distribution! Bad!
- return Math.abs(random.nextLong()) % n;
+ // WARNING: Very skewed distribution! Bad!
+ return Math.abs(random.nextLong() % n);
}
}
@@ -470,14 +470,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES);
long initialBid = nextPrice(random);
- long dateTime = timestamp;
long expires = timestamp + nextAuctionLengthMs(random, timestamp);
String name = nextString(random, 20);
String desc = nextString(random, 100);
long reserve = initialBid + nextPrice(random);
int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8;
String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize);
- return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category,
+ return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category,
extra);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
index 3caaf51..5799bb2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
@@ -42,7 +42,7 @@ public class GeneratorConfig implements Serializable {
*/
public static final int PERSON_PROPORTION = 1;
public static final int AUCTION_PROPORTION = 3;
- public static final int BID_PROPORTION = 46;
+ private static final int BID_PROPORTION = 46;
public static final int PROPORTION_DENOMINATOR =
PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION;
@@ -55,12 +55,12 @@ public class GeneratorConfig implements Serializable {
* Delay between events, in microseconds. If the array has more than one entry then
* the rate is changed every {@link #stepLengthSec}, and wraps around.
*/
- public final long[] interEventDelayUs;
+ private final long[] interEventDelayUs;
/**
* Delay before changing the current inter-event delay.
*/
- public final long stepLengthSec;
+ private final long stepLengthSec;
/**
* Time for first event (ms since epoch).
@@ -88,13 +88,13 @@ public class GeneratorConfig implements Serializable {
* True period of epoch in milliseconds. Derived from above.
* (Ie time to run through cycle for all interEventDelayUs entries).
*/
- public final long epochPeriodMs;
+ private final long epochPeriodMs;
/**
* Number of events per epoch. Derived from above.
* (Ie number of events to run through cycle for all interEventDelayUs entries).
*/
- public final long eventsPerEpoch;
+ private final long eventsPerEpoch;
public GeneratorConfig(
NexmarkConfiguration configuration, long baseTime, long firstEventId,
@@ -121,10 +121,10 @@ public class GeneratorConfig implements Serializable {
long eventsPerEpoch = 0;
long epochPeriodMs = 0;
if (interEventDelayUs.length > 1) {
- for (int i = 0; i < interEventDelayUs.length; i++) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
eventsPerEpoch += numEventsForThisCycle;
- epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+ epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
}
}
this.eventsPerEpoch = eventsPerEpoch;
@@ -248,16 +248,16 @@ public class GeneratorConfig implements Serializable {
long epoch = eventNumber / eventsPerEpoch;
long n = eventNumber % eventsPerEpoch;
long offsetInEpochMs = 0;
- for (int i = 0; i < interEventDelayUs.length; i++) {
- long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i];
+ for (long interEventDelayU : interEventDelayUs) {
+ long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU;
if (n < numEventsForThisCycle) {
- long offsetInCycleUs = n * interEventDelayUs[i];
+ long offsetInCycleUs = n * interEventDelayU;
long timestamp =
baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L);
- return KV.of(timestamp, interEventDelayUs[i]);
+ return KV.of(timestamp, interEventDelayU);
}
n -= numEventsForThisCycle;
- offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L;
+ offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L;
}
throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
index c3c6eb0..09d945d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
@@ -116,7 +116,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
private TimestampedValue<Event> currentEvent;
/** Events which have been held back so as to force them to be late. */
- private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
+ private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
public EventReader(Generator generator) {
this.generator = generator;
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
index 15e17a8..1d04e2a 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
@@ -53,8 +53,8 @@ public class UnboundedEventSourceTest {
* confirming reading events match the model events.
*/
private static class EventIdChecker {
- private Set<Long> seenPersonIds = new HashSet<>();
- private Set<Long> seenAuctionIds = new HashSet<>();
+ private final Set<Long> seenPersonIds = new HashSet<>();
+ private final Set<Long> seenAuctionIds = new HashSet<>();
public void add(Event event) {
if (event.newAuction != null) {
@@ -90,7 +90,6 @@ public class UnboundedEventSourceTest {
EventIdChecker checker = new EventIdChecker();
PipelineOptions options = TestPipeline.testingPipelineOptions();
- Pipeline p = TestPipeline.create(options);
UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
UnboundedReader<Event> reader = source.createReader(options, null);
http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index 4839da5..31f293e 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -30,6 +30,20 @@
<packaging>pom</packaging>
<name>Apache Beam :: Integration Tests</name>
+ <profiles>
+ <profile>
+ <id>release</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<modules>
<module>java</module>
</modules>