You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:20 UTC
[12/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
deleted file mode 100644
index 9573ef7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A custom, unbounded source of event records.
- *
- * <p>If {@code isRateLimited} is true, events become available for return from the reader such
- * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
- * events are returned every time the system asks for one.
- */
-class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
- private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
-
- /** Configuration for generator to use when reading synthetic events. May be split. */
- private final GeneratorConfig config;
-
- /** How many unbounded sources to create. */
- private final int numEventGenerators;
-
- /** How many seconds to hold back the watermark. */
- private final long watermarkHoldbackSec;
-
- /** Are we rate limiting the events? */
- private final boolean isRateLimited;
-
- public UnboundedEventSource(GeneratorConfig config, int numEventGenerators,
- long watermarkHoldbackSec, boolean isRateLimited) {
- this.config = config;
- this.numEventGenerators = numEventGenerators;
- this.watermarkHoldbackSec = watermarkHoldbackSec;
- this.isRateLimited = isRateLimited;
- }
-
- /** A reader to pull events from the generator. */
- private class EventReader extends UnboundedReader<Event> {
- /** Generator we are reading from. */
- private final Generator generator;
-
- /**
- * Current watermark (ms since epoch). Initially set to beginning of time.
- * Then updated to be the time of the next generated event.
- * Then, once all events have been generated, set to the end of time.
- */
- private long watermark;
-
- /**
- * Current backlog (ms), as delay between timestamp of last returned event and the timestamp
- * we should be up to according to wall-clock time. Used only for logging.
- */
- private long backlogDurationMs;
-
- /**
- * Current backlog, as estimated number of event bytes we are behind, or null if
- * unknown. Reported to callers.
- */
- @Nullable
- private Long backlogBytes;
-
- /**
- * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported.
- */
- private long lastReportedBacklogWallclock;
-
- /**
- * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never
- * calculated.
- */
- private long timestampAtLastReportedBacklogMs;
-
- /** Next event to make 'current' when wallclock time has advanced sufficiently. */
- @Nullable
- private TimestampedValue<Event> pendingEvent;
-
- /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */
- private long pendingEventWallclockTime;
-
- /** Current event to return from getCurrent. */
- @Nullable
- private TimestampedValue<Event> currentEvent;
-
- /** Events which have been held back so as to force them to be late. */
- private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
-
- public EventReader(Generator generator) {
- this.generator = generator;
- watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
- lastReportedBacklogWallclock = -1;
- pendingEventWallclockTime = -1;
- timestampAtLastReportedBacklogMs = -1;
- }
-
- public EventReader(GeneratorConfig config) {
- this(new Generator(config));
- }
-
- @Override
- public boolean start() {
- LOG.trace("starting unbounded generator {}", generator);
- return advance();
- }
-
-
- @Override
- public boolean advance() {
- long now = System.currentTimeMillis();
-
- while (pendingEvent == null) {
- if (!generator.hasNext() && heldBackEvents.isEmpty()) {
- // No more events, EVER.
- if (isRateLimited) {
- updateBacklog(System.currentTimeMillis(), 0);
- }
- if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- LOG.trace("stopped unbounded generator {}", generator);
- }
- return false;
- }
-
- Generator.NextEvent next = heldBackEvents.peek();
- if (next != null && next.wallclockTimestamp <= now) {
- // Time to use the held-back event.
- heldBackEvents.poll();
- LOG.debug("replaying held-back event {}ms behind watermark",
- watermark - next.eventTimestamp);
- } else if (generator.hasNext()) {
- next = generator.nextEvent();
- if (isRateLimited && config.configuration.probDelayedEvent > 0.0
- && config.configuration.occasionalDelaySec > 0
- && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
- // We'll hold back this event and go around again.
- long delayMs =
- ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
- + 1L;
- LOG.debug("delaying event by {}ms", delayMs);
- heldBackEvents.add(next.withDelay(delayMs));
- continue;
- }
- } else {
- // Waiting for held-back event to fire.
- if (isRateLimited) {
- updateBacklog(now, 0);
- }
- return false;
- }
-
- pendingEventWallclockTime = next.wallclockTimestamp;
- pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
- long newWatermark =
- next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis();
- if (newWatermark > watermark) {
- watermark = newWatermark;
- }
- }
-
- if (isRateLimited) {
- if (pendingEventWallclockTime > now) {
- // We want this event to fire in the future. Try again later.
- updateBacklog(now, 0);
- return false;
- }
- updateBacklog(now, now - pendingEventWallclockTime);
- }
-
- // This event is ready to fire.
- currentEvent = pendingEvent;
- pendingEvent = null;
- return true;
- }
-
- private void updateBacklog(long now, long newBacklogDurationMs) {
- backlogDurationMs = newBacklogDurationMs;
- long interEventDelayUs = generator.currentInterEventDelayUs();
- if (interEventDelayUs != 0) {
- long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
- backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
- }
- if (lastReportedBacklogWallclock < 0
- || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
- double timeDialation = Double.NaN;
- if (pendingEvent != null
- && lastReportedBacklogWallclock >= 0
- && timestampAtLastReportedBacklogMs >= 0) {
- long wallclockProgressionMs = now - lastReportedBacklogWallclock;
- long eventTimeProgressionMs =
- pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
- timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
- }
- LOG.debug(
- "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
- + "with {} time dilation",
- backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation);
- lastReportedBacklogWallclock = now;
- if (pendingEvent != null) {
- timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
- }
- }
- }
-
- @Override
- public Event getCurrent() {
- if (currentEvent == null) {
- throw new NoSuchElementException();
- }
- return currentEvent.getValue();
- }
-
- @Override
- public Instant getCurrentTimestamp() {
- if (currentEvent == null) {
- throw new NoSuchElementException();
- }
- return currentEvent.getTimestamp();
- }
-
- @Override
- public void close() {
- // Nothing to close.
- }
-
- @Override
- public UnboundedEventSource getCurrentSource() {
- return UnboundedEventSource.this;
- }
-
- @Override
- public Instant getWatermark() {
- return new Instant(watermark);
- }
-
- @Override
- public Generator.Checkpoint getCheckpointMark() {
- return generator.toCheckpoint();
- }
-
- @Override
- public long getSplitBacklogBytes() {
- return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
- }
-
- @Override
- public String toString() {
- return String.format("EventReader(%d, %d, %d)",
- generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(),
- generator.getCurrentConfig().getStopEventId());
- }
- }
-
- @Override
- public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
- return Generator.Checkpoint.CODER_INSTANCE;
- }
-
- @Override
- public List<UnboundedEventSource> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) {
- LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
- List<UnboundedEventSource> results = new ArrayList<>();
- // Ignore desiredNumSplits and use numEventGenerators instead.
- for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
- results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited));
- }
- return results;
- }
-
- @Override
- public EventReader createReader(
- PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
- if (checkpoint == null) {
- LOG.trace("creating initial unbounded reader for {}", config);
- return new EventReader(config);
- } else {
- LOG.trace("resuming unbounded reader from {}", checkpoint);
- return new EventReader(checkpoint.toGenerator(config));
- }
- }
-
- @Override
- public void validate() {
- // Nothing to validate.
- }
-
- @Override
- public Coder<Event> getDefaultOutputCoder() {
- return Event.CODER;
- }
-
- @Override
- public String toString() {
- return String.format(
- "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index 594195a..9f1ddf8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark;
import static com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,7 +30,11 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -45,10 +48,10 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
@@ -249,7 +252,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
}
@Override
- public AuctionOrBidWindow getSideInputWindow(BoundedWindow window) {
+ public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
index dc8094b..e7f51b7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
@@ -26,6 +26,10 @@ import java.util.TreeMap;
import java.util.TreeSet;
import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
new file mode 100644
index 0000000..265ccf7
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Apex runner.
+ */
+public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkApexOptions extends NexmarkOptions, ApexPipelineOptions {
+ }
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) {
+ // Gather command line args, baseline, configurations, etc.
+ NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkApexOptions.class);
+ options.setRunner(ApexRunner.class);
+ NexmarkApexRunner runner = new NexmarkApexRunner(options);
+ new NexmarkApexDriver().runAll(options, runner);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
new file mode 100644
index 0000000..2bcf82d
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkPerf;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Apex runner.
+ */
+public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> {
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ return 4;
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return 5;
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(
+ PipelineBuilder builder) {
+ builder.build(options);
+ }
+
+ @Override
+ protected void waitForPublisherPreload() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Nullable
+ protected NexmarkPerf monitor(NexmarkQuery query) {
+ return null;
+ }
+
+ public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) {
+ super(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
new file mode 100644
index 0000000..2b825f3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An implementation of the 'NEXMark queries' using the Direct Runner.
+ */
+class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkDirectOptions extends NexmarkOptions, DirectOptions {
+ }
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) {
+ NexmarkDirectOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkDirectOptions.class);
+ options.setRunner(DirectRunner.class);
+ NexmarkDirectRunner runner = new NexmarkDirectRunner(options);
+ new NexmarkDirectDriver().runAll(options, runner);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
new file mode 100644
index 0000000..1391040
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a single query using the Direct Runner.
+ */
+class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> {
+ public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) {
+ super(options);
+ }
+
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ return 4;
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return 1;
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+ throw new UnsupportedOperationException(
+ "Cannot use --pubSubMode=COMBINED with DirectRunner");
+ }
+
+ /**
+ * Monitor the progress of the publisher job. Return when it has been generating events for
+ * at least {@code configuration.preloadSeconds}.
+ */
+ @Override
+ protected void waitForPublisherPreload() {
+ throw new UnsupportedOperationException(
+ "Cannot use --pubSubMode=COMBINED with DirectRunner");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
new file mode 100644
index 0000000..bf0b115
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Flink runner.
+ */
+public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkFlinkOptions extends NexmarkOptions, FlinkPipelineOptions {
+ }
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) {
+ // Gather command line args, baseline, configurations, etc.
+ NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkFlinkOptions.class);
+ options.setRunner(FlinkRunner.class);
+ NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
+ new NexmarkFlinkDriver().runAll(options, runner);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
new file mode 100644
index 0000000..9d547ef
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Flink runner.
+ */
+public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ return 4;
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return 5;
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(
+ PipelineBuilder builder) {
+ builder.build(options);
+ }
+
+ @Override
+ protected void waitForPublisherPreload() {
+ throw new UnsupportedOperationException();
+ }
+
+ public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
+ super(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
new file mode 100644
index 0000000..f5a9751
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkGoogleOptions extends NexmarkOptions, DataflowPipelineOptions {
+
+ }
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) {
+ // Gather command line args, baseline, configurations, etc.
+ NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkGoogleOptions.class);
+ options.setRunner(DataflowRunner.class);
+ NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
+ new NexmarkGoogleDriver().runAll(options, runner);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
new file mode 100644
index 0000000..7ffd47a
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.integration.nexmark.Monitor;
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.sdk.PipelineResult;
+import org.joda.time.Duration;
+
+/**
+ * Run a singe Nexmark query using a given configuration on Google Dataflow.
+ */
+class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+
+ public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
+ super(options);
+ }
+
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ String machineType = options.getWorkerMachineType();
+ if (machineType == null || machineType.isEmpty()) {
+ return 1;
+ }
+ String[] split = machineType.split("-");
+ if (split.length != 3) {
+ return 1;
+ }
+ try {
+ return Integer.parseInt(split[2]);
+ } catch (NumberFormatException ex) {
+ return 1;
+ }
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
+ }
+
+ @Override
+ protected String getJobId(PipelineResult job) {
+ return ((DataflowPipelineJob) job).getJobId();
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+ String jobName = options.getJobName();
+ String appName = options.getAppName();
+ options.setJobName("p-" + jobName);
+ options.setAppName("p-" + appName);
+ int coresPerWorker = coresPerWorker();
+ int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
+ / coresPerWorker;
+ options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
+ options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
+ publisherMonitor = new Monitor<Event>(queryName, "publisher");
+ try {
+ builder.build(options);
+ } finally {
+ options.setJobName(jobName);
+ options.setAppName(appName);
+ options.setMaxNumWorkers(options.getMaxNumWorkers());
+ options.setNumWorkers(options.getNumWorkers());
+ }
+ }
+
+ /**
+ * Monitor the progress of the publisher job. Return when it has been generating events for
+ * at least {@code configuration.preloadSeconds}.
+ */
+ @Override
+ protected void waitForPublisherPreload() {
+ checkNotNull(publisherMonitor);
+ checkNotNull(publisherResult);
+ if (!options.getMonitorJobs()) {
+ return;
+ }
+ if (!(publisherResult instanceof DataflowPipelineJob)) {
+ return;
+ }
+ if (configuration.preloadSeconds <= 0) {
+ return;
+ }
+
+ NexmarkUtils.console("waiting for publisher to pre-load");
+
+ DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
+
+ long numEvents = 0;
+ long startMsSinceEpoch = -1;
+ long endMsSinceEpoch = -1;
+ while (true) {
+ PipelineResult.State state = job.getState();
+ switch (state) {
+ case UNKNOWN:
+ // Keep waiting.
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ break;
+ case STOPPED:
+ case DONE:
+ case CANCELLED:
+ case FAILED:
+ case UPDATED:
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ return;
+ case RUNNING:
+ numEvents = getLong(job, publisherMonitor.getElementCounter());
+ if (startMsSinceEpoch < 0 && numEvents > 0) {
+ startMsSinceEpoch = System.currentTimeMillis();
+ endMsSinceEpoch = startMsSinceEpoch
+ + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+ }
+ if (endMsSinceEpoch < 0) {
+ NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+ } else {
+ long remainMs = endMsSinceEpoch - System.currentTimeMillis();
+ if (remainMs > 0) {
+ NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
+ remainMs / 1000);
+ } else {
+ NexmarkUtils.console("publisher preloaded %d events", numEvents);
+ return;
+ }
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(PERF_DELAY.getMillis());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException("Interrupted: publisher still running.");
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
new file mode 100644
index 0000000..c7c32c2
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Spark runner.
+ */
+class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> {
+ /**
+ * Command line flags.
+ */
+ public interface NexmarkSparkOptions extends NexmarkOptions, SparkPipelineOptions {
+ }
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) {
+ NexmarkSparkOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkSparkOptions.class);
+ options.setRunner(SparkRunner.class);
+ NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
+ new NexmarkSparkDriver().runAll(options, runner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
new file mode 100644
index 0000000..1d49a3a
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Spark runner.
+ */
+public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> {
+ @Override
+ protected boolean isStreaming() {
+ return options.isStreaming();
+ }
+
+ @Override
+ protected int coresPerWorker() {
+ return 4;
+ }
+
+ @Override
+ protected int maxNumWorkers() {
+ return 5;
+ }
+
+ @Override
+ protected void invokeBuilderForPublishOnlyPipeline(
+ PipelineBuilder builder) {
+ builder.build(options);
+ }
+
+ @Override
+ protected void waitForPublisherPreload() {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
+ super(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
new file mode 100644
index 0000000..c8aa144
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Benchmark Execution Drivers.
+ */
+package org.apache.beam.integration.nexmark.drivers;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
new file mode 100644
index 0000000..f5cfc2b
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubJsonClient;
+
+/**
+ * Helper for working with pubsub.
+ */
+public class PubsubHelper implements AutoCloseable {
+ /**
+ * Underlying pub/sub client.
+ */
+ private final PubsubClient pubsubClient;
+
+ /**
+ * Project id.
+ */
+ private final String projectId;
+
+ /**
+ * Topics we should delete on close.
+ */
+ private final List<PubsubClient.TopicPath> createdTopics;
+
+ /**
+ * Subscriptions we should delete on close.
+ */
+ private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
+
+ private PubsubHelper(PubsubClient pubsubClient, String projectId) {
+ this.pubsubClient = pubsubClient;
+ this.projectId = projectId;
+ createdTopics = new ArrayList<>();
+ createdSubscriptions = new ArrayList<>();
+ }
+
+ /**
+ * Create a helper.
+ */
+ public static PubsubHelper create(PubsubOptions options) {
+ try {
+ return new PubsubHelper(
+ PubsubJsonClient.FACTORY.newClient(null, null, options),
+ options.getProject());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub client: ", e);
+ }
+ }
+
+ /**
+ * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+ * deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath createTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("attempting to cleanup topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ }
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ createdTopics.add(topic);
+ return topic;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Create a topic from short name if it does not already exist. The topic will not be
+ * deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("topic %s already exists", topic);
+ return topic;
+ }
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ return topic;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Check a topic corresponding to short name exists, and throw exception if not. The
+ * topic will not be deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath reuseTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("reusing existing topic %s", topic);
+ return topic;
+ }
+ throw new RuntimeException("topic '" + topic + "' does not already exist");
+ }
+
+ /**
+ * Does topic corresponding to short name exist?
+ */
+ public boolean topicExists(String shortTopic) {
+ PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
+ return existingTopics.contains(topic);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Create subscription from short name. Delete subscription if it already exists. Ensure the
+ * subscription will be deleted on cleanup. Return full subscription name.
+ */
+ public PubsubClient.SubscriptionPath createSubscription(
+ String shortTopic, String shortSubscription) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ try {
+ if (subscriptionExists(shortTopic, shortSubscription)) {
+ NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ }
+ NexmarkUtils.console("create subscription %s", subscription);
+ pubsubClient.createSubscription(topic, subscription, 60);
+ createdSubscriptions.add(subscription);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
+ }
+ return subscription;
+ }
+
+ /**
+ * Check a subscription corresponding to short name exists, and throw exception if not. The
+ * subscription will not be deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.SubscriptionPath reuseSubscription(
+ String shortTopic, String shortSubscription) {
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ if (subscriptionExists(shortTopic, shortSubscription)) {
+ NexmarkUtils.console("reusing existing subscription %s", subscription);
+ return subscription;
+ }
+ throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+ }
+
+ /**
+ * Does subscription corresponding to short name exist?
+ */
+ public boolean subscriptionExists(String shortTopic, String shortSubscription) {
+ PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ try {
+ Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
+ pubsubClient.listSubscriptions(project, topic);
+ return existingSubscriptions.contains(subscription);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
+ }
+ }
+
+ /**
+ * Delete all the subscriptions and topics we created.
+ */
+ @Override
+ public void close() {
+ for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
+ try {
+ NexmarkUtils.console("delete subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete subscription %s", subscription);
+ }
+ }
+ for (PubsubClient.TopicPath topic : createdTopics) {
+ try {
+ NexmarkUtils.console("delete topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete topic %s", topic);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
new file mode 100644
index 0000000..1161f3e
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Beam IO related utilities.
+ */
+package org.apache.beam.integration.nexmark.io;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
new file mode 100644
index 0000000..ac30568
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * An auction submitted by a person.
+ */
+public class Auction implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+ @Override
+ public void encode(Auction value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
+ STRING_CODER.encode(value.description, outStream, Context.NESTED);
+ LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
+ LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ LONG_CODER.encode(value.expires, outStream, Context.NESTED);
+ LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+ LONG_CODER.encode(value.category, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Auction decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String itemName = STRING_CODER.decode(inStream, Context.NESTED);
+ String description = STRING_CODER.decode(inStream, Context.NESTED);
+ long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
+ long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ long expires = LONG_CODER.decode(inStream, Context.NESTED);
+ long seller = LONG_CODER.decode(inStream, Context.NESTED);
+ long category = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Auction(
+ id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
+ extra);
+ }
+ };
+
+
+ /** Id of auction. */
+ @JsonProperty
+ public final long id; // primary key
+
+ /** Extra auction properties. */
+ @JsonProperty
+ public final String itemName;
+
+ @JsonProperty
+ public final String description;
+
+ /** Initial bid price, in cents. */
+ @JsonProperty
+ public final long initialBid;
+
+ /** Reserve price, in cents. */
+ @JsonProperty
+ public final long reserve;
+
+ @JsonProperty
+ public final long dateTime;
+
+ /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
+ @JsonProperty
+ public final long expires;
+
+ /** Id of person who instigated auction. */
+ @JsonProperty
+ public final long seller; // foreign key: Person.id
+
+ /** Id of category auction is listed under. */
+ @JsonProperty
+ public final long category; // foreign key: Category.id
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Auction() {
+ id = 0;
+ itemName = null;
+ description = null;
+ initialBid = 0;
+ reserve = 0;
+ dateTime = 0;
+ expires = 0;
+ seller = 0;
+ category = 0;
+ extra = null;
+ }
+
+ public Auction(long id, String itemName, String description, long initialBid, long reserve,
+ long dateTime, long expires, long seller, long category, String extra) {
+ this.id = id;
+ this.itemName = itemName;
+ this.description = description;
+ this.initialBid = initialBid;
+ this.reserve = reserve;
+ this.dateTime = dateTime;
+ this.expires = expires;
+ this.seller = seller;
+ this.category = category;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of auction which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Auction withAnnotation(String annotation) {
+ return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+ category, annotation + ": " + extra);
+ }
+
+ /**
+ * Does auction have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from auction. (Used for debugging.)
+ */
+ public Auction withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+ category, extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
+ + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
new file mode 100644
index 0000000..c014257
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.WinningBids;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+/**
+ * Result of {@link WinningBids} transform.
+ */
+public class AuctionBid implements KnownSize, Serializable {
+ public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+ @Override
+ public void encode(AuctionBid value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ Auction.CODER.encode(value.auction, outStream, Context.NESTED);
+ Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionBid decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ return new AuctionBid(auction, bid);
+ }
+ };
+
+ @JsonProperty
+ public final Auction auction;
+
+ @JsonProperty
+ public final Bid bid;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionBid() {
+ auction = null;
+ bid = null;
+ }
+
+ public AuctionBid(Auction auction, Bid bid) {
+ this.auction = auction;
+ this.bid = bid;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return auction.sizeInBytes() + bid.sizeInBytes();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
new file mode 100644
index 0000000..aa16629
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query5.
+ */
+public class AuctionCount implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+ @Override
+ public void encode(AuctionCount value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.count, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionCount decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long count = LONG_CODER.decode(inStream, Context.NESTED);
+ return new AuctionCount(auction, count);
+ }
+ };
+
+ @JsonProperty
+ public final long auction;
+
+ @JsonProperty
+ public final long count;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionCount() {
+ auction = 0;
+ count = 0;
+ }
+
+ public AuctionCount(long auction, long count) {
+ this.auction = auction;
+ this.count = count;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
new file mode 100644
index 0000000..f365cc8
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query2.
+ */
+public class AuctionPrice implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+ @Override
+ public void encode(AuctionPrice value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionPrice decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ return new AuctionPrice(auction, price);
+ }
+ };
+
+ @JsonProperty
+ public final long auction;
+
+ /** Price in cents. */
+ @JsonProperty
+ public final long price;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionPrice() {
+ auction = 0;
+ price = 0;
+ }
+
+ public AuctionPrice(long auction, long price) {
+ this.auction = auction;
+ this.price = price;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
new file mode 100644
index 0000000..59a33c1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * A bid for an item on auction.
+ */
+public class Bid implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+ @Override
+ public void encode(Bid value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Bid decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long bidder = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Bid(auction, bidder, price, dateTime, extra);
+ }
+ };
+
+ /**
+ * Comparator to order bids by ascending price then descending time
+ * (for finding winning bids).
+ */
+ public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
+ @Override
+ public int compare(Bid left, Bid right) {
+ int i = Double.compare(left.price, right.price);
+ if (i != 0) {
+ return i;
+ }
+ return Long.compare(right.dateTime, left.dateTime);
+ }
+ };
+
+ /**
+ * Comparator to order bids by ascending time then ascending price.
+ * (for finding most recent bids).
+ */
+ public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
+ @Override
+ public int compare(Bid left, Bid right) {
+ int i = Long.compare(left.dateTime, right.dateTime);
+ if (i != 0) {
+ return i;
+ }
+ return Double.compare(left.price, right.price);
+ }
+ };
+
+ /** Id of auction this bid is for. */
+ @JsonProperty
+ public final long auction; // foreign key: Auction.id
+
+ /** Id of person bidding in auction. */
+ @JsonProperty
+ public final long bidder; // foreign key: Person.id
+
+ /** Price of bid, in cents. */
+ @JsonProperty
+ public final long price;
+
+ /**
+ * Instant at which bid was made (ms since epoch).
+ * NOTE: This may be earlier than the system's event time.
+ */
+ @JsonProperty
+ public final long dateTime;
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Bid() {
+ auction = 0;
+ bidder = 0;
+ price = 0;
+ dateTime = 0;
+ extra = null;
+ }
+
+ public Bid(long auction, long bidder, long price, long dateTime, String extra) {
+ this.auction = auction;
+ this.bidder = bidder;
+ this.price = price;
+ this.dateTime = dateTime;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of bid which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Bid withAnnotation(String annotation) {
+ return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
+ }
+
+ /**
+ * Does bid have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from bid. (Used for debugging.)
+ */
+ public Bid withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8 + 8 + 8 + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
new file mode 100644
index 0000000..7c4dfae
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of query 11.
+ */
+public class BidsPerSession implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+ @Override
+ public void encode(BidsPerSession value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.personId, outStream, Context.NESTED);
+ LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+ }
+
+ @Override
+ public BidsPerSession decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long personId = LONG_CODER.decode(inStream, Context.NESTED);
+ long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+ return new BidsPerSession(personId, bidsPerSession);
+ }
+ };
+
+ @JsonProperty
+ public final long personId;
+
+ @JsonProperty
+ public final long bidsPerSession;
+
+ public BidsPerSession() {
+ personId = 0;
+ bidsPerSession = 0;
+ }
+
+ public BidsPerSession(long personId, long bidsPerSession) {
+ this.personId = personId;
+ this.bidsPerSession = bidsPerSession;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ // Two longs.
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}