You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:22 UTC
[14/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index d311dc4..e8d791f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -26,7 +26,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,6 +34,35 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.io.PubsubHelper;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.Query0;
+import org.apache.beam.integration.nexmark.queries.Query0Model;
+import org.apache.beam.integration.nexmark.queries.Query1;
+import org.apache.beam.integration.nexmark.queries.Query10;
+import org.apache.beam.integration.nexmark.queries.Query11;
+import org.apache.beam.integration.nexmark.queries.Query12;
+import org.apache.beam.integration.nexmark.queries.Query1Model;
+import org.apache.beam.integration.nexmark.queries.Query2;
+import org.apache.beam.integration.nexmark.queries.Query2Model;
+import org.apache.beam.integration.nexmark.queries.Query3;
+import org.apache.beam.integration.nexmark.queries.Query3Model;
+import org.apache.beam.integration.nexmark.queries.Query4;
+import org.apache.beam.integration.nexmark.queries.Query4Model;
+import org.apache.beam.integration.nexmark.queries.Query5;
+import org.apache.beam.integration.nexmark.queries.Query5Model;
+import org.apache.beam.integration.nexmark.queries.Query6;
+import org.apache.beam.integration.nexmark.queries.Query6Model;
+import org.apache.beam.integration.nexmark.queries.Query7;
+import org.apache.beam.integration.nexmark.queries.Query7Model;
+import org.apache.beam.integration.nexmark.queries.Query8;
+import org.apache.beam.integration.nexmark.queries.Query8Model;
+import org.apache.beam.integration.nexmark.queries.Query9;
+import org.apache.beam.integration.nexmark.queries.Query9Model;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -56,7 +84,7 @@ import org.joda.time.Duration;
/**
* Run a single Nexmark query using a given configuration.
*/
-public abstract class NexmarkRunner<OptionT extends Options> {
+public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
/**
* Minimum number of samples needed for 'stead-state' rate calculation.
*/
@@ -84,7 +112,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
*/
private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
/**
- * Options shared by all runs.
+ * NexmarkOptions shared by all runs.
*/
protected final OptionT options;
@@ -359,7 +387,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
return perf;
}
- String getJobId(PipelineResult job) {
+ protected String getJobId(PipelineResult job) {
return "";
}
@@ -461,7 +489,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
/**
* Build and run a pipeline using specified options.
*/
- protected interface PipelineBuilder<OptionT extends Options> {
+ protected interface PipelineBuilder<OptionT extends NexmarkOptions> {
void build(OptionT publishOnlyOptions);
}
@@ -966,7 +994,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
// We'll shutdown the publisher job when we notice the main job has finished.
invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
@Override
- public void build(Options publishOnlyOptions) {
+ public void build(NexmarkOptions publishOnlyOptions) {
Pipeline sp = Pipeline.create(options);
NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
publisherMonitor = new Monitor<Event>(queryName, "publisher");
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
deleted file mode 100644
index a46d38a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Spark runner.
- */
-class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> {
- /**
- * Command line flags.
- */
- public interface NexmarkSparkOptions extends Options, SparkPipelineOptions {
- }
-
- /**
- * Entry point.
- */
- public static void main(String[] args) {
- NexmarkSparkOptions options =
- PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkSparkOptions.class);
-// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
- options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
- NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
- new NexmarkSparkDriver().runAll(options, runner);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
deleted file mode 100644
index 30ae9ca..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a query using the Spark runner.
- */
-public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> {
- @Override
- protected boolean isStreaming() {
- return options.isStreaming();
- }
-
- @Override
- protected int coresPerWorker() {
- return 4;
- }
-
- @Override
- protected int maxNumWorkers() {
- return 5;
- }
-
- @Override
- protected void invokeBuilderForPublishOnlyPipeline(
- PipelineBuilder builder) {
- builder.build(options);
- }
-
- @Override
- protected void waitForPublisherPreload() {
- throw new UnsupportedOperationException();
- }
-
-
- public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
- super(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
index bc47540..be7d7b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
@@ -100,7 +100,7 @@ public enum NexmarkSuite {
* with any set command line flags, except for --isStreaming which is only respected for
* the {@link #DEFAULT} suite.
*/
- public Iterable<NexmarkConfiguration> getConfigurations(Options options) {
+ public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
Set<NexmarkConfiguration> results = new LinkedHashSet<>();
for (NexmarkConfiguration configuration : configurations) {
NexmarkConfiguration result = configuration.clone();
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index f7417d3..b0421a4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -20,14 +20,30 @@ package org.apache.beam.integration.nexmark;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.AuctionCount;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.CategoryPrice;
+import org.apache.beam.integration.nexmark.model.Done;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.IdNameReserve;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.NameCityStateId;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.model.SellerPrice;
+import org.apache.beam.integration.nexmark.sources.BoundedEventSource;
+import org.apache.beam.integration.nexmark.sources.Generator;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
+import org.apache.beam.integration.nexmark.sources.UnboundedEventSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -67,7 +83,7 @@ public class NexmarkUtils {
/**
* Mapper for (de)serializing JSON.
*/
- static final ObjectMapper MAPPER = new ObjectMapper();
+ public static final ObjectMapper MAPPER = new ObjectMapper();
/**
* Possible sources for events.
@@ -382,7 +398,8 @@ public class NexmarkUtils {
*/
public static PTransform<PBegin, PCollection<Event>> batchEventsSource(
NexmarkConfiguration configuration) {
- return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), configuration.numEventGenerators));
+ return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration),
+ configuration.numEventGenerators));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
deleted file mode 100644
index 388473d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * Command line flags.
- */
-public interface Options extends PubsubOptions {
- @Description("Which suite to run. Default is to use command line arguments for one job.")
- @Default.Enum("DEFAULT")
- NexmarkSuite getSuite();
-
- void setSuite(NexmarkSuite suite);
-
- @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
- @Default.Boolean(false)
- boolean getMonitorJobs();
-
- void setMonitorJobs(boolean monitorJobs);
-
- @Description("Where the events come from.")
- @Nullable
- NexmarkUtils.SourceType getSourceType();
-
- void setSourceType(NexmarkUtils.SourceType sourceType);
-
- @Description("Prefix for input files if using avro input")
- @Nullable
- String getInputPath();
-
- void setInputPath(String inputPath);
-
- @Description("Where results go.")
- @Nullable
- NexmarkUtils.SinkType getSinkType();
-
- void setSinkType(NexmarkUtils.SinkType sinkType);
-
- @Description("Which mode to run in when source is PUBSUB.")
- @Nullable
- NexmarkUtils.PubSubMode getPubSubMode();
-
- void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
-
- @Description("Which query to run.")
- @Nullable
- Integer getQuery();
-
- void setQuery(Integer query);
-
- @Description("Prefix for output files if using text output for results or running Query 10.")
- @Nullable
- String getOutputPath();
-
- void setOutputPath(String outputPath);
-
- @Description("Base name of pubsub topic to publish to in streaming mode.")
- @Nullable
- @Default.String("nexmark")
- String getPubsubTopic();
-
- void setPubsubTopic(String pubsubTopic);
-
- @Description("Base name of pubsub subscription to read from in streaming mode.")
- @Nullable
- @Default.String("nexmark")
- String getPubsubSubscription();
-
- void setPubsubSubscription(String pubsubSubscription);
-
- @Description("Base name of BigQuery table name if using BigQuery output.")
- @Nullable
- @Default.String("nexmark")
- String getBigQueryTable();
-
- void setBigQueryTable(String bigQueryTable);
-
- @Description("Approximate number of events to generate. "
- + "Zero for effectively unlimited in streaming mode.")
- @Nullable
- Long getNumEvents();
-
- void setNumEvents(Long numEvents);
-
- @Description("Time in seconds to preload the subscription with data, at the initial input rate "
- + "of the pipeline.")
- @Nullable
- Integer getPreloadSeconds();
-
- void setPreloadSeconds(Integer preloadSeconds);
-
- @Description("Number of unbounded sources to create events.")
- @Nullable
- Integer getNumEventGenerators();
-
- void setNumEventGenerators(Integer numEventGenerators);
-
- @Description("Shape of event rate curve.")
- @Nullable
- NexmarkUtils.RateShape getRateShape();
-
- void setRateShape(NexmarkUtils.RateShape rateShape);
-
- @Description("Initial overall event rate (in --rateUnit).")
- @Nullable
- Integer getFirstEventRate();
-
- void setFirstEventRate(Integer firstEventRate);
-
- @Description("Next overall event rate (in --rateUnit).")
- @Nullable
- Integer getNextEventRate();
-
- void setNextEventRate(Integer nextEventRate);
-
- @Description("Unit for rates.")
- @Nullable
- NexmarkUtils.RateUnit getRateUnit();
-
- void setRateUnit(NexmarkUtils.RateUnit rateUnit);
-
- @Description("Overall period of rate shape, in seconds.")
- @Nullable
- Integer getRatePeriodSec();
-
- void setRatePeriodSec(Integer ratePeriodSec);
-
- @Description("If true, relay events in real time in streaming mode.")
- @Nullable
- Boolean getIsRateLimited();
-
- void setIsRateLimited(Boolean isRateLimited);
-
- @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
- + " time in the past so that multiple runs will see exactly the same event streams"
- + " and should thus have exactly the same results.")
- @Nullable
- Boolean getUseWallclockEventTime();
-
- void setUseWallclockEventTime(Boolean useWallclockEventTime);
-
- @Description("Assert pipeline results match model results.")
- @Nullable
- boolean getAssertCorrectness();
-
- void setAssertCorrectness(boolean assertCorrectness);
-
- @Description("Log all input events.")
- @Nullable
- boolean getLogEvents();
-
- void setLogEvents(boolean logEvents);
-
- @Description("Log all query results.")
- @Nullable
- boolean getLogResults();
-
- void setLogResults(boolean logResults);
-
- @Description("Average size in bytes for a person record.")
- @Nullable
- Integer getAvgPersonByteSize();
-
- void setAvgPersonByteSize(Integer avgPersonByteSize);
-
- @Description("Average size in bytes for an auction record.")
- @Nullable
- Integer getAvgAuctionByteSize();
-
- void setAvgAuctionByteSize(Integer avgAuctionByteSize);
-
- @Description("Average size in bytes for a bid record.")
- @Nullable
- Integer getAvgBidByteSize();
-
- void setAvgBidByteSize(Integer avgBidByteSize);
-
- @Description("Ratio of bids for 'hot' auctions above the background.")
- @Nullable
- Integer getHotAuctionRatio();
-
- void setHotAuctionRatio(Integer hotAuctionRatio);
-
- @Description("Ratio of auctions for 'hot' sellers above the background.")
- @Nullable
- Integer getHotSellersRatio();
-
- void setHotSellersRatio(Integer hotSellersRatio);
-
- @Description("Ratio of auctions for 'hot' bidders above the background.")
- @Nullable
- Integer getHotBiddersRatio();
-
- void setHotBiddersRatio(Integer hotBiddersRatio);
-
- @Description("Window size in seconds.")
- @Nullable
- Long getWindowSizeSec();
-
- void setWindowSizeSec(Long windowSizeSec);
-
- @Description("Window period in seconds.")
- @Nullable
- Long getWindowPeriodSec();
-
- void setWindowPeriodSec(Long windowPeriodSec);
-
- @Description("If in streaming mode, the holdback for watermark in seconds.")
- @Nullable
- Long getWatermarkHoldbackSec();
-
- void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
-
- @Description("Roughly how many auctions should be in flight for each generator.")
- @Nullable
- Integer getNumInFlightAuctions();
-
- void setNumInFlightAuctions(Integer numInFlightAuctions);
-
-
- @Description("Maximum number of people to consider as active for placing auctions or bids.")
- @Nullable
- Integer getNumActivePeople();
-
- void setNumActivePeople(Integer numActivePeople);
-
- @Description("Filename of perf data to append to.")
- @Nullable
- String getPerfFilename();
-
- void setPerfFilename(String perfFilename);
-
- @Description("Filename of baseline perf data to read from.")
- @Nullable
- String getBaselineFilename();
-
- void setBaselineFilename(String baselineFilename);
-
- @Description("Filename of summary perf data to append to.")
- @Nullable
- String getSummaryFilename();
-
- void setSummaryFilename(String summaryFilename);
-
- @Description("Filename for javascript capturing all perf data and any baselines.")
- @Nullable
- String getJavascriptFilename();
-
- void setJavascriptFilename(String javascriptFilename);
-
- @Description("If true, don't run the actual query. Instead, calculate the distribution "
- + "of number of query results per (event time) minute according to the query model.")
- @Nullable
- boolean getJustModelResultRate();
-
- void setJustModelResultRate(boolean justModelResultRate);
-
- @Description("Coder strategy to use.")
- @Nullable
- NexmarkUtils.CoderStrategy getCoderStrategy();
-
- void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
-
- @Description("Delay, in milliseconds, for each event. We will peg one core for this "
- + "number of milliseconds to simulate CPU-bound computation.")
- @Nullable
- Long getCpuDelayMs();
-
- void setCpuDelayMs(Long cpuDelayMs);
-
- @Description("Extra data, in bytes, to save to persistent state for each event. "
- + "This will force I/O all the way to durable storage to simulate an "
- + "I/O-bound computation.")
- @Nullable
- Long getDiskBusyBytes();
-
- void setDiskBusyBytes(Long diskBusyBytes);
-
- @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
- @Nullable
- Integer getAuctionSkip();
-
- void setAuctionSkip(Integer auctionSkip);
-
- @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
- @Nullable
- Integer getFanout();
-
- void setFanout(Integer fanout);
-
- @Description("Length of occasional delay to impose on events (in seconds).")
- @Nullable
- Long getOccasionalDelaySec();
-
- void setOccasionalDelaySec(Long occasionalDelaySec);
-
- @Description("Probability that an event will be delayed by delayS.")
- @Nullable
- Double getProbDelayedEvent();
-
- void setProbDelayedEvent(Double probDelayedEvent);
-
- @Description("Maximum size of each log file (in events). For Query10 only.")
- @Nullable
- Integer getMaxLogEvents();
-
- void setMaxLogEvents(Integer maxLogEvents);
-
- @Description("How to derive names of resources.")
- @Default.Enum("QUERY_AND_SALT")
- NexmarkUtils.ResourceNameMode getResourceNameMode();
-
- void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
-
- @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
- @Default.Boolean(true)
- boolean getManageResources();
-
- void setManageResources(boolean manageResources);
-
- @Description("If true, use pub/sub publish time instead of event time.")
- @Nullable
- Boolean getUsePubsubPublishTime();
-
- void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
-
- @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
- + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
- @Nullable
- Long getOutOfOrderGroupSize();
-
- void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
-
- @Description("If false, do not add the Monitor and Snoop transforms.")
- @Nullable
- Boolean getDebug();
-
- void setDebug(Boolean value);
-
- @Description("If set, cancel running pipelines after this long")
- @Nullable
- Long getRunningTimeMinutes();
-
- void setRunningTimeMinutes(Long value);
-
- @Description("If set and --monitorJobs is true, check that the system watermark is never more "
- + "than this far behind real time")
- @Nullable
- Long getMaxSystemLagSeconds();
-
- void setMaxSystemLagSeconds(Long value);
-
- @Description("If set and --monitorJobs is true, check that the data watermark is never more "
- + "than this far behind real time")
- @Nullable
- Long getMaxDataLagSeconds();
-
- void setMaxDataLagSeconds(Long value);
-
- @Description("Only start validating watermarks after this many seconds")
- @Nullable
- Long getWatermarkValidationDelaySeconds();
-
- void setWatermarkValidationDelaySeconds(Long value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
deleted file mode 100644
index 251a6ee..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A person either creating an auction or making a bid.
- */
-public class Person implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Person> CODER = new AtomicCoder<Person>() {
- @Override
- public void encode(Person value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
- STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
- STRING_CODER.encode(value.city, outStream, Context.NESTED);
- STRING_CODER.encode(value.state, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
- }
-
- @Override
- public Person decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
- String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
- String city = STRING_CODER.decode(inStream, Context.NESTED);
- String state = STRING_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
- }
- };
-
- /** Id of person. */
- @JsonProperty
- public final long id; // primary key
-
- /** Extra person properties. */
- @JsonProperty
- public final String name;
-
- @JsonProperty
- public final String emailAddress;
-
- @JsonProperty
- public final String creditCard;
-
- @JsonProperty
- public final String city;
-
- @JsonProperty
- public final String state;
-
- @JsonProperty
- public final long dateTime;
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- public final String extra;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Person() {
- id = 0;
- name = null;
- emailAddress = null;
- creditCard = null;
- city = null;
- state = null;
- dateTime = 0;
- extra = null;
- }
-
- public Person(long id, String name, String emailAddress, String creditCard, String city,
- String state, long dateTime, String extra) {
- this.id = id;
- this.name = name;
- this.emailAddress = emailAddress;
- this.creditCard = creditCard;
- this.city = city;
- this.state = state;
- this.dateTime = dateTime;
- this.extra = extra;
- }
-
- /**
- * Return a copy of person which capture the given annotation.
- * (Used for debugging).
- */
- public Person withAnnotation(String annotation) {
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
- annotation + ": " + extra);
- }
-
- /**
- * Does person have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from person. (Used for debugging.)
- */
- public Person withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
- extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
- + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
deleted file mode 100644
index a79a25b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubJsonClient;
-
-/**
- * Helper for working with pubsub.
- */
-public class PubsubHelper implements AutoCloseable {
- /**
- * Underlying pub/sub client.
- */
- private final PubsubClient pubsubClient;
-
- /**
- * Project id.
- */
- private final String projectId;
-
- /**
- * Topics we should delete on close.
- */
- private final List<PubsubClient.TopicPath> createdTopics;
-
- /**
- * Subscriptions we should delete on close.
- */
- private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
-
- private PubsubHelper(PubsubClient pubsubClient, String projectId) {
- this.pubsubClient = pubsubClient;
- this.projectId = projectId;
- createdTopics = new ArrayList<>();
- createdSubscriptions = new ArrayList<>();
- }
-
- /**
- * Create a helper.
- */
- public static PubsubHelper create(PubsubOptions options) {
- try {
- return new PubsubHelper(
- PubsubJsonClient.FACTORY.newClient(null, null, options),
- options.getProject());
- } catch (IOException e) {
- throw new RuntimeException("Unable to create Pubsub client: ", e);
- }
- }
-
- /**
- * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
- * deleted on cleanup. Return full topic name.
- */
- public PubsubClient.TopicPath createTopic(String shortTopic) {
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- try {
- if (topicExists(shortTopic)) {
- NexmarkUtils.console("attempting to cleanup topic %s", topic);
- pubsubClient.deleteTopic(topic);
- }
- NexmarkUtils.console("create topic %s", topic);
- pubsubClient.createTopic(topic);
- createdTopics.add(topic);
- return topic;
- } catch (IOException e) {
- throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
- }
- }
-
- /**
- * Create a topic from short name if it does not already exist. The topic will not be
- * deleted on cleanup. Return full topic name.
- */
- public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- try {
- if (topicExists(shortTopic)) {
- NexmarkUtils.console("topic %s already exists", topic);
- return topic;
- }
- NexmarkUtils.console("create topic %s", topic);
- pubsubClient.createTopic(topic);
- return topic;
- } catch (IOException e) {
- throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
- }
- }
-
- /**
- * Check a topic corresponding to short name exists, and throw exception if not. The
- * topic will not be deleted on cleanup. Return full topic name.
- */
- public PubsubClient.TopicPath reuseTopic(String shortTopic) {
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- if (topicExists(shortTopic)) {
- NexmarkUtils.console("reusing existing topic %s", topic);
- return topic;
- }
- throw new RuntimeException("topic '" + topic + "' does not already exist");
- }
-
- /**
- * Does topic corresponding to short name exist?
- */
- public boolean topicExists(String shortTopic) {
- PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- try {
- Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
- return existingTopics.contains(topic);
- } catch (IOException e) {
- throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
- }
- }
-
- /**
- * Create subscription from short name. Delete subscription if it already exists. Ensure the
- * subscription will be deleted on cleanup. Return full subscription name.
- */
- public PubsubClient.SubscriptionPath createSubscription(
- String shortTopic, String shortSubscription) {
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- PubsubClient.SubscriptionPath subscription =
- PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
- try {
- if (subscriptionExists(shortTopic, shortSubscription)) {
- NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
- pubsubClient.deleteSubscription(subscription);
- }
- NexmarkUtils.console("create subscription %s", subscription);
- pubsubClient.createSubscription(topic, subscription, 60);
- createdSubscriptions.add(subscription);
- } catch (IOException e) {
- throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
- }
- return subscription;
- }
-
- /**
- * Check a subscription corresponding to short name exists, and throw exception if not. The
- * subscription will not be deleted on cleanup. Return full topic name.
- */
- public PubsubClient.SubscriptionPath reuseSubscription(
- String shortTopic, String shortSubscription) {
- PubsubClient.SubscriptionPath subscription =
- PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
- if (subscriptionExists(shortTopic, shortSubscription)) {
- NexmarkUtils.console("reusing existing subscription %s", subscription);
- return subscription;
- }
- throw new RuntimeException("subscription'" + subscription + "' does not already exist");
- }
-
- /**
- * Does subscription corresponding to short name exist?
- */
- public boolean subscriptionExists(String shortTopic, String shortSubscription) {
- PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
- PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
- PubsubClient.SubscriptionPath subscription =
- PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
- try {
- Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
- pubsubClient.listSubscriptions(project, topic);
- return existingSubscriptions.contains(subscription);
- } catch (IOException e) {
- throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
- }
- }
-
- /**
- * Delete all the subscriptions and topics we created.
- */
- @Override
- public void close() {
- for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
- try {
- NexmarkUtils.console("delete subscription %s", subscription);
- pubsubClient.deleteSubscription(subscription);
- } catch (IOException ex) {
- NexmarkUtils.console("could not delete subscription %s", subscription);
- }
- }
- for (PubsubClient.TopicPath topic : createdTopics) {
- try {
- NexmarkUtils.console("delete topic %s", topic);
- pubsubClient.deleteTopic(topic);
- } catch (IOException ex) {
- NexmarkUtils.console("could not delete topic %s", topic);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
deleted file mode 100644
index e88fce0..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 0: Pass events through unchanged. However, force them to do a round trip through
- * serialization so that we measure the impact of the choice of coders.
- */
-public class Query0 extends NexmarkQuery {
- public Query0(NexmarkConfiguration configuration) {
- super(configuration, "Query0");
- }
-
- private PCollection<Event> applyTyped(PCollection<Event> events) {
- final Coder<Event> coder = events.getCoder();
- return events
- // Force round trip through coder.
- .apply(name + ".Serialize",
- ParDo.of(new DoFn<Event, Event>() {
- private final Aggregator<Long, Long> bytes =
- createAggregator("bytes", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) throws CoderException, IOException {
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- coder.encode(c.element(), outStream, Coder.Context.OUTER);
- byte[] byteArray = outStream.toByteArray();
- bytes.addValue((long) byteArray.length);
- ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
- Event event = coder.decode(inStream, Coder.Context.OUTER);
- c.output(event);
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
deleted file mode 100644
index 37e3f93..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query0}.
- */
-public class Query0Model extends NexmarkQueryModel {
- /**
- * Simulator for query 0.
- */
- private class Simulator extends AbstractSimulator<Event, Event> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- addResult(timestampedEvent);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
- }
- }
-
- public Query0Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- protected AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
deleted file mode 100644
index a1ecdeb..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
- * FROM bid [ROWS UNBOUNDED];
- * </pre>
- *
- * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
- * slowed down.
- */
-class Query1 extends NexmarkQuery {
- public Query1(NexmarkConfiguration configuration) {
- super(configuration, "Query1");
- }
-
- private PCollection<Bid> applyTyped(PCollection<Event> events) {
- return events
- // Only want the bid events.
- .apply(JUST_BIDS)
-
- // Map the conversion function over all bids.
- .apply(name + ".ToEuros",
- ParDo.of(new DoFn<Bid, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(new Bid(
- bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
deleted file mode 100644
index 7bdcb36..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Query "10", 'Log to sharded files' (Not in original suite.)
- *
- * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
- */
-class Query10 extends NexmarkQuery {
- private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
- private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
- private static final int NUM_SHARDS_PER_WORKER = 5;
- private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
-
- /**
- * Capture everything we need to know about the records in a single output file.
- */
- private static class OutputFile implements Serializable {
- /** Maximum possible timestamp of records in file. */
- private final Instant maxTimestamp;
- /** Shard within window. */
- private final String shard;
- /** Index of file in all files in shard. */
- private final long index;
- /** Timing of records in this file. */
- private final PaneInfo.Timing timing;
- /** Path to file containing records, or {@literal null} if no output required. */
- @Nullable
- private final String filename;
-
- public OutputFile(
- Instant maxTimestamp,
- String shard,
- long index,
- PaneInfo.Timing timing,
- @Nullable String filename) {
- this.maxTimestamp = maxTimestamp;
- this.shard = shard;
- this.index = index;
- this.timing = timing;
- this.filename = filename;
- }
-
- @Override
- public String toString() {
- return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
- }
- }
-
- /**
- * GCS uri prefix for all log and 'finished' files. If null they won't be written.
- */
- @Nullable
- private String outputPath;
-
- /**
- * Maximum number of workers, used to determine log sharding factor.
- */
- private int maxNumWorkers;
-
- public Query10(NexmarkConfiguration configuration) {
- super(configuration, "Query10");
- }
-
- public void setOutputPath(@Nullable String outputPath) {
- this.outputPath = outputPath;
- }
-
- public void setMaxNumWorkers(int maxNumWorkers) {
- this.maxNumWorkers = maxNumWorkers;
- }
-
- /**
- * Return channel for writing bytes to GCS.
- *
- * @throws IOException
- */
- private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
- throws IOException {
- WritableByteChannel channel =
- GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
- checkState(channel instanceof GoogleCloudStorageWriteChannel);
- ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
- return channel;
- }
-
- /** Return a short string to describe {@code timing}. */
- private String timingToString(PaneInfo.Timing timing) {
- switch (timing) {
- case EARLY:
- return "E";
- case ON_TIME:
- return "O";
- case LATE:
- return "L";
- }
- throw new RuntimeException(); // cases are exhaustive
- }
-
- /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
- private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
- @Nullable String filename =
- outputPath == null
- ? null
- : String.format("%s/LOG-%s-%s-%03d-%s-%x",
- outputPath, window.maxTimestamp(), shard, pane.getIndex(),
- timingToString(pane.getTiming()),
- ThreadLocalRandom.current().nextLong());
- return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
- pane.getTiming(), filename);
- }
-
- /**
- * Return path to which we should write the index for {@code window}, or {@literal null}
- * if no output required.
- */
- @Nullable
- private String indexPathFor(BoundedWindow window) {
- if (outputPath == null) {
- return null;
- }
- return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
- }
-
- private PCollection<Done> applyTyped(PCollection<Event> events) {
- final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
-
- return events.apply(name + ".ShardEvents",
- ParDo.of(new DoFn<Event, KV<String, Event>>() {
- final Aggregator<Long, Long> lateCounter =
- createAggregator("actuallyLateEvent", Sum.ofLongs());
- final Aggregator<Long, Long> onTimeCounter =
- createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().hasAnnotation("LATE")) {
- lateCounter.addValue(1L);
- LOG.error("Observed late: %s", c.element());
- } else {
- onTimeCounter.addValue(1L);
- }
- int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
- String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
- c.output(KV.of(shard, c.element()));
- }
- }))
- .apply(name + ".WindowEvents",
- Window.<KV<String, Event>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterEach.inOrder(
- Repeatedly
- .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(LATE_BATCHING_PERIOD)))))
- .discardingFiredPanes()
- // Use a 1 day allowed lateness so that any forgotten hold will stall the
- // pipeline for that period and be very noticeable.
- .withAllowedLateness(Duration.standardDays(1)))
- .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
- .apply(name + ".CheckForLateEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<String, Iterable<Event>>>() {
- final Aggregator<Long, Long> earlyCounter =
- createAggregator("earlyShard", Sum.ofLongs());
- final Aggregator<Long, Long> onTimeCounter =
- createAggregator("onTimeShard", Sum.ofLongs());
- final Aggregator<Long, Long> lateCounter =
- createAggregator("lateShard", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedLatePaneCounter =
- createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
- createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- int numLate = 0;
- int numOnTime = 0;
- for (Event event : c.element().getValue()) {
- if (event.hasAnnotation("LATE")) {
- numLate++;
- } else {
- numOnTime++;
- }
- }
- String shard = c.element().getKey();
- LOG.error(
- "%s with timestamp %s has %d actually late and %d on-time "
- + "elements in pane %s for window %s",
- shard, c.timestamp(), numLate, numOnTime, c.pane(),
- window.maxTimestamp());
- if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
- if (numLate == 0) {
- LOG.error(
- "ERROR! No late events in late pane for %s", shard);
- unexpectedLatePaneCounter.addValue(1L);
- }
- if (numOnTime > 0) {
- LOG.error(
- "ERROR! Have %d on-time events in late pane for %s",
- numOnTime, shard);
- unexpectedOnTimeElementCounter.addValue(1L);
- }
- lateCounter.addValue(1L);
- } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
- if (numOnTime + numLate < configuration.maxLogEvents) {
- LOG.error(
- "ERROR! Only have %d events in early pane for %s",
- numOnTime + numLate, shard);
- }
- earlyCounter.addValue(1L);
- } else {
- onTimeCounter.addValue(1L);
- }
- c.output(c.element());
- }
- }))
- .apply(name + ".UploadEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<Void, OutputFile>>() {
- final Aggregator<Long, Long> savedFileCounter =
- createAggregator("savedFile", Sum.ofLongs());
- final Aggregator<Long, Long> writtenRecordsCounter =
- createAggregator("writtenRecords", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- String shard = c.element().getKey();
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- OutputFile outputFile = outputFileFor(window, shard, c.pane());
- LOG.error(
- "Writing %s with record timestamp %s, window timestamp %s, pane %s",
- shard, c.timestamp(), window.maxTimestamp(), c.pane());
- if (outputFile.filename != null) {
- LOG.error("Beginning write to '%s'", outputFile.filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(openWritableGcsFile(options, outputFile
- .filename))) {
- for (Event event : c.element().getValue()) {
- Event.CODER.encode(event, output, Coder.Context.OUTER);
- writtenRecordsCounter.addValue(1L);
- if (++n % 10000 == 0) {
- LOG.error("So far written %d records to '%s'", n,
- outputFile.filename);
- }
- }
- }
- LOG.error("Written all %d records to '%s'", n, outputFile.filename);
- }
- savedFileCounter.addValue(1L);
- c.output(KV.<Void, OutputFile>of(null, outputFile));
- }
- }))
- // Clear fancy triggering from above.
- .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterWatermark.pastEndOfWindow())
- // We expect no late data here, but we'll assume the worst so we can detect any.
- .withAllowedLateness(Duration.standardDays(1))
- .discardingFiredPanes())
- // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel
- // and Pardo is also in parallel, why group all elements in memory of the same executor?
- .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
- .apply(name + ".Index",
- ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
- final Aggregator<Long, Long> unexpectedLateCounter =
- createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedEarlyCounter =
- createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedIndexCounter =
- createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
- final Aggregator<Long, Long> finalizedCounter =
- createAggregator("indexed", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- if (c.pane().getTiming() == Timing.LATE) {
- unexpectedLateCounter.addValue(1L);
- LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.EARLY) {
- unexpectedEarlyCounter.addValue(1L);
- LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.ON_TIME
- && c.pane().getIndex() != 0) {
- unexpectedIndexCounter.addValue(1L);
- LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
- } else {
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- LOG.error(
- "Index with record timestamp %s, window timestamp %s, pane %s",
- c.timestamp(), window.maxTimestamp(), c.pane());
-
- @Nullable String filename = indexPathFor(window);
- if (filename != null) {
- LOG.error("Beginning write to '%s'", filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(
- openWritableGcsFile(options, filename))) {
- for (OutputFile outputFile : c.element().getValue()) {
- output.write(outputFile.toString().getBytes());
- n++;
- }
- }
- LOG.error("Written all %d lines to '%s'", n, filename);
- }
- c.output(
- new Done("written for timestamp " + window.maxTimestamp()));
- finalizedCounter.addValue(1L);
- }
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
deleted file mode 100644
index d610b7c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "11", 'User sessions' (Not in original suite.)
- *
- * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
- * However limit the session to at most {@code maxLogEvents}. Emit the number of
- * bids per session.
- */
-class Query11 extends NexmarkQuery {
- public Query11(NexmarkConfiguration configuration) {
- super(configuration, "Query11");
- }
-
- private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
- return events.apply(JUST_BIDS)
- .apply(name + ".Rekey",
- // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
- ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(KV.of(bid.bidder, (Void) null));
- }
- }))
- .apply(Window.<KV<Long, Void>>into(
- Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
- .apply(Count.<Long, Void>perKey())
- .apply(name + ".ToResult",
- ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
deleted file mode 100644
index 72fbb57..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "12", 'Processing time windows' (Not in original suite.)
- *
- * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
- * of bids per window.
- */
-class Query12 extends NexmarkQuery {
- public Query12(NexmarkConfiguration configuration) {
- super(configuration, "Query12");
- }
-
- private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
- return events
- .apply(JUST_BIDS)
- .apply(name + ".Rekey",
- // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
- ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(KV.of(bid.bidder, (Void) null));
- }
- }))
- .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(
- Duration.standardSeconds(configuration.windowSizeSec))))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.ZERO))
- .apply(Count.<Long, Void>perKey())
- .apply(name + ".ToResult",
- ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- new BidsPerSession(c.element().getKey(), c.element().getValue()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
deleted file mode 100644
index 16287e6..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query1}.
- */
-public class Query1Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 1.
- */
- private class Simulator extends AbstractSimulator<Event, Bid> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non-bid events.
- return;
- }
- Bid bid = event.bid;
- Bid resultBid =
- new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
- TimestampedValue<Bid> result =
- TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
- addResult(result);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
- }
- }
-
- public Query1Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
deleted file mode 100644
index 828cdf5..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(auction, price)
- * FROM Bid [NOW]
- * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
- * </pre>
- *
- * <p>As written that query will only yield a few hundred results over event streams of
- * arbitrary size. To make it more interesting we instead choose bids for every
- * {@code auctionSkip}'th auction.
- */
-class Query2 extends NexmarkQuery {
- public Query2(NexmarkConfiguration configuration) {
- super(configuration, "Query2");
- }
-
- private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
- return events
- // Only want the bid events.
- .apply(JUST_BIDS)
-
- // Select just the bids for the auctions we care about.
- .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
- @Override
- public Boolean apply(Bid bid) {
- return bid.auction % configuration.auctionSkip == 0;
- }
- }))
-
- // Project just auction id and price.
- .apply(name + ".Project",
- ParDo.of(new DoFn<Bid, AuctionPrice>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(new AuctionPrice(bid.auction, bid.price));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
deleted file mode 100644
index 7769e52..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query2}.
- */
-public class Query2Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 2.
- */
- private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non bid events.
- return;
- }
- Bid bid = event.bid;
- if (bid.auction % configuration.auctionSkip != 0) {
- // Ignore bids for auctions we don't care about.
- return;
- }
- AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
- TimestampedValue<AuctionPrice> result =
- TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
- addResult(result);
- }
- }
-
- public Query2Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}