You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:14 UTC
[06/55] [abbrv] beam git commit: NexMark
NexMark
Port unit tests, cleanup pom and add license to readme
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f08970a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f08970a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f08970a
Branch: refs/heads/master
Commit: 1f08970a8fdc9c5e5613227031125d9d929ca841
Parents: f0ce31b
Author: Mark Shields <ma...@google.com>
Authored: Mon Mar 28 16:25:29 2016 -0700
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:26 2017 +0200
----------------------------------------------------------------------
integration/java/pom.xml | 288 +++++++
.../integration/nexmark/AbstractSimulator.java | 212 ++++++
.../beam/integration/nexmark/Auction.java | 190 +++++
.../beam/integration/nexmark/AuctionBid.java | 87 +++
.../beam/integration/nexmark/AuctionCount.java | 90 +++
.../beam/integration/nexmark/AuctionPrice.java | 91 +++
.../integration/nexmark/BEAM_ON_FLINK_ON_GCP.md | 282 +++++++
.../apache/beam/integration/nexmark/Bid.java | 178 +++++
.../integration/nexmark/BidsPerSession.java | 89 +++
.../integration/nexmark/BoundedEventSource.java | 197 +++++
.../beam/integration/nexmark/CategoryPrice.java | 100 +++
.../apache/beam/integration/nexmark/Done.java | 83 +++
.../apache/beam/integration/nexmark/Event.java | 181 +++++
.../beam/integration/nexmark/Generator.java | 590 +++++++++++++++
.../integration/nexmark/GeneratorConfig.java | 295 ++++++++
.../beam/integration/nexmark/IdNameReserve.java | 100 +++
.../beam/integration/nexmark/KnownSize.java | 27 +
.../beam/integration/nexmark/Monitor.java | 102 +++
.../integration/nexmark/NameCityStateId.java | 106 +++
.../nexmark/NexmarkConfiguration.java | 662 ++++++++++++++++
.../beam/integration/nexmark/NexmarkDriver.java | 297 ++++++++
.../integration/nexmark/NexmarkFlinkDriver.java | 49 ++
.../integration/nexmark/NexmarkFlinkRunner.java | 67 ++
.../nexmark/NexmarkGoogleDriver.java | 90 +++
.../nexmark/NexmarkGoogleRunner.java | 660 ++++++++++++++++
.../nexmark/NexmarkInProcessDriver.java | 48 ++
.../nexmark/NexmarkInProcessRunner.java | 77 ++
.../beam/integration/nexmark/NexmarkPerf.java | 212 ++++++
.../beam/integration/nexmark/NexmarkQuery.java | 276 +++++++
.../integration/nexmark/NexmarkQueryModel.java | 123 +++
.../beam/integration/nexmark/NexmarkRunner.java | 746 +++++++++++++++++++
.../beam/integration/nexmark/NexmarkSuite.java | 112 +++
.../beam/integration/nexmark/NexmarkUtils.java | 681 +++++++++++++++++
.../beam/integration/nexmark/Options.java | 360 +++++++++
.../apache/beam/integration/nexmark/Person.java | 166 +++++
.../beam/integration/nexmark/PubsubHelper.java | 217 ++++++
.../apache/beam/integration/nexmark/Query0.java | 72 ++
.../beam/integration/nexmark/Query0Model.java | 62 ++
.../apache/beam/integration/nexmark/Query1.java | 64 ++
.../beam/integration/nexmark/Query10.java | 378 ++++++++++
.../beam/integration/nexmark/Query11.java | 76 ++
.../beam/integration/nexmark/Query12.java | 79 ++
.../beam/integration/nexmark/Query1Model.java | 73 ++
.../apache/beam/integration/nexmark/Query2.java | 75 ++
.../beam/integration/nexmark/Query2Model.java | 76 ++
.../apache/beam/integration/nexmark/Query3.java | 248 ++++++
.../beam/integration/nexmark/Query3Model.java | 119 +++
.../apache/beam/integration/nexmark/Query4.java | 110 +++
.../beam/integration/nexmark/Query4Model.java | 181 +++++
.../apache/beam/integration/nexmark/Query5.java | 127 ++++
.../beam/integration/nexmark/Query5Model.java | 174 +++++
.../apache/beam/integration/nexmark/Query6.java | 154 ++++
.../beam/integration/nexmark/Query6Model.java | 128 ++++
.../apache/beam/integration/nexmark/Query7.java | 87 +++
.../beam/integration/nexmark/Query7Model.java | 128 ++++
.../apache/beam/integration/nexmark/Query8.java | 92 +++
.../beam/integration/nexmark/Query8Model.java | 145 ++++
.../apache/beam/integration/nexmark/Query9.java | 40 +
.../beam/integration/nexmark/Query9Model.java | 44 ++
.../apache/beam/integration/nexmark/README.md | 166 +++++
.../beam/integration/nexmark/SellerPrice.java | 91 +++
.../nexmark/UnboundedEventSource.java | 329 ++++++++
.../beam/integration/nexmark/WinningBids.java | 378 ++++++++++
.../nexmark/WinningBidsSimulator.java | 203 +++++
.../nexmark/BoundedEventSourceTest.java | 71 ++
.../beam/integration/nexmark/GeneratorTest.java | 111 +++
.../beam/integration/nexmark/QueryTest.java | 103 +++
.../nexmark/UnboundedEventSourceTest.java | 109 +++
68 files changed, 12424 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
new file mode 100644
index 0000000..b160b56
--- /dev/null
+++ b/integration/java/pom.xml
@@ -0,0 +1,288 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.2.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>java-integration-all</artifactId>
+
+ <name>Apache Beam :: Integration Tests :: Java All</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <skipITs>true</skipITs>
+ </properties>
+
+ <build>
+ <extensions>
+ <!-- Use os-maven-plugin to initialize the "os.detected" properties -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.4.0.Final</version>
+ </extension>
+ </extensions>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+
+
+ <!-- Source plugin for generating source and test-source JARs. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+
+ <!-- Avro plugin for automatic code generation -->
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <id>schemas</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
+ <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Java SDK -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ </dependency>
+
+ <!-- Java runner for Google Cloud Dataflow -->
+ <dependency>
+ <groupId>org.apache.beam.runners</groupId>
+ <artifactId>google-cloud-dataflow-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Direct runner -->
+ <dependency>
+ <groupId>org.apache.beam.runners</groupId>
+ <artifactId>direct</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Java runner for Flink -->
+ <dependency>
+ <groupId>org.apache.beam.runners</groupId>
+ <artifactId>flink_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ <version>1.0.3</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Extra libraries -->
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-dataflow</artifactId>
+ <version>${dataflow.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ <version>${bigquery.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>gcsio</artifactId>
+ <version>${google-cloud-bigdataoss.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>${hamcrest.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>${slf4j.version}</version>
+ <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${jsr305.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>1.1.33.Fork13</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
new file mode 100644
index 0000000..6473c35
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Abstract base class for simulator of a query.
+ *
+ * @param <InputT> Type of input elements.
+ * @param <OutputT> Type of output elements.
+ */
+abstract class AbstractSimulator<InputT, OutputT> {
+ /** Window size for action bucket sampling. */
+ public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+
+ /** Input event stream we should draw from. */
+ private final Iterator<TimestampedValue<InputT>> input;
+
+ /** Set to true when no more results. */
+ private boolean isDone;
+
+ /**
+ * Results which have not yet been returned by the {@link #results} iterator.
+ */
+ private final List<TimestampedValue<OutputT>> pendingResults;
+
+ /**
+ * Current window timestamp (ms since epoch).
+ */
+ private long currentWindow;
+
+ /**
+ * Number of (possibly intermediate) results for the current window.
+ */
+ private long currentCount;
+
+ /**
+ * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
+ * iterator.
+ */
+ private final List<Long> pendingCounts;
+
+ public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
+ this.input = input;
+ isDone = false;
+ pendingResults = new ArrayList<>();
+ currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ currentCount = 0;
+ pendingCounts = new ArrayList<>();
+ }
+
+ /** Called by implementors of {@link #run}: Fetch the next input element. */
+ @Nullable
+ protected TimestampedValue<InputT> nextInput() {
+ if (!input.hasNext()) {
+ return null;
+ }
+ TimestampedValue<InputT> timestampedInput = input.next();
+ NexmarkUtils.info("input: %s", timestampedInput);
+ return timestampedInput;
+ }
+
+ /**
+ * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of
+ * recording the expected activity of the query over time.
+ */
+ protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+ NexmarkUtils.info("intermediate result: %s", result);
+ updateCounts(result.getTimestamp());
+ }
+
+ /**
+ * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
+ * semantic correctness.
+ */
+ protected void addResult(TimestampedValue<OutputT> result) {
+ NexmarkUtils.info("result: %s", result);
+ pendingResults.add(result);
+ updateCounts(result.getTimestamp());
+ }
+
+ /**
+ * Update window and counts.
+ */
+ private void updateCounts(Instant timestamp) {
+ long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
+ if (window > currentWindow) {
+ if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+ pendingCounts.add(currentCount);
+ }
+ currentCount = 0;
+ currentWindow = window;
+ }
+ currentCount++;
+ }
+
+ /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
+ protected void allDone() {
+ isDone = true;
+ }
+
+ /**
+ * Overridden by derived classes to do the next increment of work. Each call should
+ * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
+ * or {@link #allDone}. It is ok for a single call to emit more than one result via
+ * {@link #addResult}. It is ok for a single call to run the entire simulation, though
+ * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
+ * stall.
+ */
+ protected abstract void run();
+
+ /**
+ * Return iterator over all expected timestamped results. The underlying simulator state is
+ * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
+ */
+ public Iterator<TimestampedValue<OutputT>> results() {
+ return new Iterator<TimestampedValue<OutputT>>() {
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (!pendingResults.isEmpty()) {
+ return true;
+ }
+ if (isDone) {
+ return false;
+ }
+ run();
+ }
+ }
+
+ @Override
+ public TimestampedValue<OutputT> next() {
+ TimestampedValue<OutputT> result = pendingResults.get(0);
+ pendingResults.remove(0);
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
+ * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be
+ * called.
+ */
+ public Iterator<Long> resultsPerWindow() {
+ return new Iterator<Long>() {
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (!pendingCounts.isEmpty()) {
+ return true;
+ }
+ if (isDone) {
+ if (currentCount > 0) {
+ pendingCounts.add(currentCount);
+ currentCount = 0;
+ currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ run();
+ }
+ }
+
+ @Override
+ public Long next() {
+ Long result = pendingCounts.get(0);
+ pendingCounts.remove(0);
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
new file mode 100644
index 0000000..94f2647
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * An auction submitted by a person.
+ */
+public class Auction implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+ @Override
+ public void encode(Auction value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
+ STRING_CODER.encode(value.description, outStream, Context.NESTED);
+ LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
+ LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ LONG_CODER.encode(value.expires, outStream, Context.NESTED);
+ LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+ LONG_CODER.encode(value.category, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Auction decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String itemName = STRING_CODER.decode(inStream, Context.NESTED);
+ String description = STRING_CODER.decode(inStream, Context.NESTED);
+ long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
+ long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ long expires = LONG_CODER.decode(inStream, Context.NESTED);
+ long seller = LONG_CODER.decode(inStream, Context.NESTED);
+ long category = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Auction(
+ id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
+ extra);
+ }
+ };
+
+
+ /** Id of auction. */
+ @JsonProperty
+ public final long id; // primary key
+
+ /** Extra auction properties. */
+ @JsonProperty
+ public final String itemName;
+
+ @JsonProperty
+ public final String description;
+
+ /** Initial bid price, in cents. */
+ @JsonProperty
+ public final long initialBid;
+
+ /** Reserve price, in cents. */
+ @JsonProperty
+ public final long reserve;
+
+ @JsonProperty
+ public final long dateTime;
+
+ /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
+ @JsonProperty
+ public final long expires;
+
+ /** Id of person who instigated auction. */
+ @JsonProperty
+ public final long seller; // foreign key: Person.id
+
+ /** Id of category auction is listed under. */
+ @JsonProperty
+ public final long category; // foreign key: Category.id
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Auction() {
+ id = 0;
+ itemName = null;
+ description = null;
+ initialBid = 0;
+ reserve = 0;
+ dateTime = 0;
+ expires = 0;
+ seller = 0;
+ category = 0;
+ extra = null;
+ }
+
+ public Auction(long id, String itemName, String description, long initialBid, long reserve,
+ long dateTime, long expires, long seller, long category, String extra) {
+ this.id = id;
+ this.itemName = itemName;
+ this.description = description;
+ this.initialBid = initialBid;
+ this.reserve = reserve;
+ this.dateTime = dateTime;
+ this.expires = expires;
+ this.seller = seller;
+ this.category = category;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of auction which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Auction withAnnotation(String annotation) {
+ return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+ category, annotation + ": " + extra);
+ }
+
+ /**
+ * Does auction have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from auction. (Used for debugging.)
+ */
+ public Auction withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+ category, extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
+ + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
new file mode 100644
index 0000000..8c3697a
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link WinningBids} transform.
+ */
+public class AuctionBid implements KnownSize, Serializable {
+ public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+ @Override
+ public void encode(AuctionBid value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ Auction.CODER.encode(value.auction, outStream, Context.NESTED);
+ Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionBid decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ return new AuctionBid(auction, bid);
+ }
+ };
+
+ @JsonProperty
+ public final Auction auction;
+
+ @JsonProperty
+ public final Bid bid;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionBid() {
+ auction = null;
+ bid = null;
+ }
+
+ public AuctionBid(Auction auction, Bid bid) {
+ this.auction = auction;
+ this.bid = bid;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return auction.sizeInBytes() + bid.sizeInBytes();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
new file mode 100644
index 0000000..a0fbebc
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query5}.
+ */
+public class AuctionCount implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+ @Override
+ public void encode(AuctionCount value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.count, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionCount decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long count = LONG_CODER.decode(inStream, Context.NESTED);
+ return new AuctionCount(auction, count);
+ }
+ };
+
+ @JsonProperty
+ public final long auction;
+
+ @JsonProperty
+ public final long count;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionCount() {
+ auction = 0;
+ count = 0;
+ }
+
+ public AuctionCount(long auction, long count) {
+ this.auction = auction;
+ this.count = count;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
new file mode 100644
index 0000000..4f25a9b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query2}.
+ */
+public class AuctionPrice implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+ @Override
+ public void encode(AuctionPrice value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ }
+
+ @Override
+ public AuctionPrice decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ return new AuctionPrice(auction, price);
+ }
+ };
+
+ @JsonProperty
+ public final long auction;
+
+ /** Price in cents. */
+ @JsonProperty
+ public final long price;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private AuctionPrice() {
+ auction = 0;
+ price = 0;
+ }
+
+ public AuctionPrice(long auction, long price) {
+ this.auction = auction;
+ this.price = price;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
new file mode 100644
index 0000000..d1b51e8
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
@@ -0,0 +1,282 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Running NexMark on Beam on Flink on Google Compute Platform
+
+Here's how to create a cluster of VMs on Google Compute Platform, deploy
+Flink to them, and invoke a NexMark pipeline using the Beam-on-Flink
+runner.
+
+These instructions are somewhat baroque and I hope they can be
+simplified over time.
+
+## Prerequisites
+
+You'll need:
+
+* the Google Cloud SDK
+* a clone of the Beam repository
+* a Flink binary distribution
+* a project on Google Compute Platform.
+
+## Establish the shell environment
+
+```
+# Beam root
+BEAM=<path to Beam source directory>
+# Flink root
+FLINK_VER=flink-1.0.3
+FLINK=<path to Flink distribution directory>
+# Google Cloud project
+PROJECT=<your project id>
+# Google Cloud zone
+ZONE=<your project zone>
+# Cloud commands
+GCLOUD=<path to gcloud command>
+GSUTIL=<path to gsutil command>
+```
+
+## Establish VM names for Flink master and workers
+
+```
+MASTER=flink-m
+NUM_WORKERS=5
+WORKERS=""
+for (( i = 0; i < $NUM_WORKERS; i++ )); do
+ WORKERS="$WORKERS flink-w-$i"
+done
+ALL="$MASTER $WORKERS"
+```
+
+## Build Beam
+
+```
+( cd $BEAM && mvn clean install )
+```
+
+## Bring up the cluster
+
+Establish project defaults and authenticate:
+```
+$GCLOUD init
+$GCLOUD auth login
+```
+
+Build Google Cloud Dataproc cluster:
+```
+$GCLOUD beta dataproc clusters create \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ --bucket=nexmark \
+ --scopes=cloud-platform \
+ --num-workers=$NUM_WORKERS \
+ --image-version=preview \
+ flink
+```
+
+Force google_compute_engine ssh keys to be generated locally:
+```
+$GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER \
+ --command 'exit'
+```
+
+Open ports on the VMs:
+```
+$GCLOUD compute firewall-rules create allow-monitoring --allow tcp:8080-8081
+$GCLOUD compute firewall-rules create allow-debug --allow tcp:5555
+```
+
+Establish keys on master and workers
+**CAUTION:** This will leave your private key on your master VM.
+Better would be to create a key just for inter-worker ssh.
+```
+for m in $ALL; do
+ echo "*** $m ***"
+ $GCLOUD beta compute scp \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ ~/.ssh/google_compute_engine.pub $m:~/.ssh/
+done
+$GCLOUD beta compute scp \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ ~/.ssh/google_compute_engine $MASTER:~/.ssh/
+```
+
+Collect IP addresses for workers:
+```
+MASTER_EXT_IP=$($GCLOUD compute instances describe \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER | grep natIP: | sed 's/[ ]*natIP:[ ]*//')
+MASTER_IP=$($GCLOUD compute instances describe \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//')
+WORKER_IPS=""
+for m in $WORKERS; do
+ echo "*** $m ***"
+ WORKER_IP=$($GCLOUD compute instances describe \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $m | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//')
+ WORKER_IPS="$WORKER_IPS $WORKER_IP"
+done
+```
+
+Configure Flink:
+```
+cat $FLINK/conf/flink-conf.yaml \
+ | sed "s|.*\(jobmanager.rpc.address\):.*|\1: $MASTER_IP|g" \
+ | sed "s|.*\(jobmanager.heap.mb\):.*|\1: 4096|g" \
+ | sed "s|.*\(taskmanager.heap.mb\):.*|\1: 8192|g" \
+ | sed "s|.*\(parallelism.default\):.*|\1: $(($NUM_WORKERS * 4))|g" \
+ | sed "s|.*\(fs.hdfs.hadoopconf\):.*|\1: /etc/hadoop/conf|g" \
+ | sed "s|.*\(taskmanager.numberOfTaskSlots\):.*|\1: 4|g" \
+ | sed "s|.*\(jobmanager.web.submit.enable\):.*|\1: false|g" \
+ | sed "s|.*\(env.ssh.opts\):.*||g" \
+ > ~/flink-conf.yaml
+cat $FLINK/conf/log4j.properties \
+ | sed "s|.*\(log4j.rootLogger\)=.*|\1=ERROR, file|g" \
+ > ~/log4j.properties
+echo "env.ssh.opts: -i /home/$USER/.ssh/google_compute_engine -o StrictHostKeyChecking=no" >> ~/flink-conf.yaml
+echo "$MASTER_IP:8081" > ~/masters
+echo -n > ~/slaves
+for ip in $WORKER_IPS; do
+ echo $ip >> ~/slaves
+done
+cp -f \
+ ~/flink-conf.yaml \
+ ~/masters ~/slaves \
+ ~/log4j.properties \
+ $FLINK/conf/
+```
+
+Package configured Flink for distribution to workers:
+```
+( cd ~/ && tar -cvzf ~/flink.tgz $FLINK/* )
+```
+
+Distribute:
+```
+$GSUTIL cp ~/flink.tgz gs://nexmark
+for m in $ALL; do
+ echo "*** $m ***"
+ $GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $m \
+ --command 'gsutil cp gs://nexmark/flink.tgz ~/ && tar -xvzf ~/flink.tgz'
+done
+```
+
+Start the Flink cluster:
+```
+$GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER \
+ --command "~/$FLINK_VER/bin/start-cluster.sh"
+```
+
+Bring up the Flink monitoring UI:
+```
+/usr/bin/google-chrome $MASTER_EXT_IP:8081 &
+```
+
+## Run NexMark
+
+Distribute the Beam + NexMark jar to all workers:
+```
+$GSUTIL cp $BEAM/integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar gs://nexmark
+for m in $ALL; do
+ echo "*** $m ***"
+ $GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $m \
+ --command "gsutil cp gs://nexmark/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar ~/$FLINK_VER/lib/"
+done
+```
+
+Create a Pubsub topic and subscription for testing:
+```
+$GCLOUD alpha pubsub \
+ --project=$PROJECT \
+ topics create flink_test
+
+$GCLOUD alpha pubsub \
+ --project=$PROJECT \
+ subscriptions create flink_test \
+ --topic flink_test \
+ --ack-deadline=60 \
+ --topic-project=$PROJECT
+```
+
+Launch!
+**NOTE:** As of flink-1.0.3 this will throw a `NullPointerException`
+in `org.apache.beam.sdk.io.PubsubUnboundedSink$WriterFn.startBundle`.
+See Jira issue [BEAM-196](https://issues.apache.org/jira/browse/BEAM-196).
+
+```
+$GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER \
+ --command "~/$FLINK_VER/bin/flink run \
+ -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \
+ ~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
+ --project=$PROJECT \
+ --streaming=true \
+ --query=0 \
+ --sourceType=PUBSUB \
+ --pubSubMode=COMBINED \
+ --pubsubTopic=flink_test \
+ --resourceNameMode=VERBATIM \
+ --manageResources=false \
+ --monitorJobs=false \
+ --numEventGenerators=5 \
+ --firstEventRate=1000 \
+ --nextEventRate=1000 \
+ --isRateLimited=true \
+ --numEvents=0 \
+ --useWallclockEventTime=true \
+ --usePubsubPublishTime=true"
+```
+
+## Teardown the cluster
+
+Stop the Flink cluster:
+```
+$GCLOUD compute ssh \
+ --project=$PROJECT \
+ --zone=$ZONE \
+ $MASTER \
+ --command "~/$FLINK_VER/bin/stop-cluster.sh"
+```
+
+Teardown the Dataproc cluster:
+```
+$GCLOUD beta dataproc clusters delete \
+ --project=$PROJECT \
+ flink
+```
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
new file mode 100644
index 0000000..ce2184b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * A bid for an item on auction.
+ */
+public class Bid implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+ @Override
+ public void encode(Bid value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+ LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Bid decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long auction = LONG_CODER.decode(inStream, Context.NESTED);
+ long bidder = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Bid(auction, bidder, price, dateTime, extra);
+ }
+ };
+
+ /**
+ * Comparator to order bids by ascending price then descending time
+ * (for finding winning bids).
+ */
+ public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
+ @Override
+ public int compare(Bid left, Bid right) {
+ int i = Double.compare(left.price, right.price);
+ if (i != 0) {
+ return i;
+ }
+ return Long.compare(right.dateTime, left.dateTime);
+ }
+ };
+
+ /**
+ * Comparator to order bids by ascending time then ascending price.
+ * (for finding most recent bids).
+ */
+ public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
+ @Override
+ public int compare(Bid left, Bid right) {
+ int i = Long.compare(left.dateTime, right.dateTime);
+ if (i != 0) {
+ return i;
+ }
+ return Double.compare(left.price, right.price);
+ }
+ };
+
+ /** Id of auction this bid is for. */
+ @JsonProperty
+ public final long auction; // foreign key: Auction.id
+
+ /** Id of person bidding in auction. */
+ @JsonProperty
+ public final long bidder; // foreign key: Person.id
+
+ /** Price of bid, in cents. */
+ @JsonProperty
+ public final long price;
+
+ /**
+ * Instant at which bid was made (ms since epoch).
+ * NOTE: This may be earlier than the system's event time.
+ */
+ @JsonProperty
+ public final long dateTime;
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Bid() {
+ auction = 0;
+ bidder = 0;
+ price = 0;
+ dateTime = 0;
+ extra = null;
+ }
+
+ public Bid(long auction, long bidder, long price, long dateTime, String extra) {
+ this.auction = auction;
+ this.bidder = bidder;
+ this.price = price;
+ this.dateTime = dateTime;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of bid which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Bid withAnnotation(String annotation) {
+ return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
+ }
+
+ /**
+ * Does bid have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from bid. (Used for debugging.)
+ */
+ public Bid withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8 + 8 + 8 + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
new file mode 100644
index 0000000..cfdd170
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of query 11.
+ */
+public class BidsPerSession implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+ @Override
+ public void encode(BidsPerSession value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.personId, outStream, Context.NESTED);
+ LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+ }
+
+ @Override
+ public BidsPerSession decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long personId = LONG_CODER.decode(inStream, Context.NESTED);
+ long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+ return new BidsPerSession(personId, bidsPerSession);
+ }
+ };
+
+ @JsonProperty
+ public final long personId;
+
+ @JsonProperty
+ public final long bidsPerSession;
+
+ public BidsPerSession() {
+ personId = 0;
+ bidsPerSession = 0;
+ }
+
+ public BidsPerSession(long personId, long bidsPerSession) {
+ this.personId = personId;
+ this.bidsPerSession = bidsPerSession;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ // Two longs.
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
new file mode 100644
index 0000000..f6cc16a
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * A custom, bounded source of event records.
+ */
+class BoundedEventSource extends BoundedSource<Event> {
+ /** Configuration we generate events against. */
+ private final GeneratorConfig config;
+
+ /** How many bounded sources to create. */
+ private final int numEventGenerators;
+
+ public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
+ this.config = config;
+ this.numEventGenerators = numEventGenerators;
+ }
+
+ /** A reader to pull events from the generator. */
+ private static class EventReader extends BoundedReader<Event> {
+ /**
+ * Event source we purporting to be reading from.
+ * (We can't use Java's capture-outer-class pointer since we must update
+ * this field on calls to splitAtFraction.)
+ */
+ private BoundedEventSource source;
+
+ /** Generator we are reading from. */
+ private final Generator generator;
+
+ private boolean reportedStop;
+
+ @Nullable
+ private TimestampedValue<Event> currentEvent;
+
+ public EventReader(BoundedEventSource source, GeneratorConfig config) {
+ this.source = source;
+ generator = new Generator(config);
+ reportedStop = false;
+ }
+
+ @Override
+ public synchronized boolean start() {
+ NexmarkUtils.info("starting bounded generator %s", generator);
+ return advance();
+ }
+
+ @Override
+ public synchronized boolean advance() {
+ if (!generator.hasNext()) {
+ // No more events.
+ if (!reportedStop) {
+ reportedStop = true;
+ NexmarkUtils.info("stopped bounded generator %s", generator);
+ }
+ return false;
+ }
+ currentEvent = generator.next();
+ return true;
+ }
+
+ @Override
+ public synchronized Event getCurrent() throws NoSuchElementException {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getValue();
+ }
+
+ @Override
+ public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (currentEvent == null) {
+ throw new NoSuchElementException();
+ }
+ return currentEvent.getTimestamp();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to close.
+ }
+
+ @Override
+ public synchronized Double getFractionConsumed() {
+ return generator.getFractionConsumed();
+ }
+
+ @Override
+ public synchronized BoundedSource<Event> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ @Nullable
+ public synchronized BoundedEventSource splitAtFraction(double fraction) {
+ long startId = generator.getCurrentConfig().getStartEventId();
+ long stopId = generator.getCurrentConfig().getStopEventId();
+ long size = stopId - startId;
+ long splitEventId = startId + Math.min((int) (size * fraction), size);
+ if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
+ // Already passed this position or split results in left or right being empty.
+ NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
+ return null;
+ }
+
+ NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
+
+ // Scale back the event space of the current generator, and return a generator config
+ // representing the event space we just 'stole' from the current generator.
+ GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
+
+ NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
+
+ // At this point
+ // generator.events() ++ new Generator(remainingConfig).events()
+ // == originalGenerator.events()
+
+ // We need a new source to represent the now smaller key space for this reader, so
+ // that we can maintain the invariant that
+ // this.getCurrentSource().createReader(...)
+ // will yield the same output as this.
+ source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
+
+ // Return a source from which we may read the 'stolen' event space.
+ return new BoundedEventSource(remainingConfig, source.numEventGenerators);
+ }
+ }
+
+ @Override
+ public List<BoundedEventSource> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) {
+ NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
+ List<BoundedEventSource> results = new ArrayList<>();
+ // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
+ for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
+ results.add(new BoundedEventSource(subConfig, 1));
+ }
+ return results;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) {
+ return config.getEstimatedSizeBytes();
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) {
+ return false;
+ }
+
+ @Override
+ public EventReader createReader(PipelineOptions options) {
+ NexmarkUtils.info("creating initial bounded reader for %s", config);
+ return new EventReader(this, config);
+ }
+
+ @Override
+ public void validate() {
+ // Nothing to validate.
+ }
+
+ @Override
+ public Coder<Event> getDefaultOutputCoder() {
+ return Event.CODER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
new file mode 100644
index 0000000..ab5d92d
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query4}.
+ */
+public class CategoryPrice implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+ @Override
+ public void encode(CategoryPrice value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.category, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+ }
+
+ @Override
+ public CategoryPrice decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long category = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ boolean isLast = INT_CODER.decode(inStream, context) != 0;
+ return new CategoryPrice(category, price, isLast);
+ }
+ };
+
+ @JsonProperty
+ public final long category;
+
+ /** Price in cents. */
+ @JsonProperty
+ public final long price;
+
+ @JsonProperty
+ public final boolean isLast;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private CategoryPrice() {
+ category = 0;
+ price = 0;
+ isLast = false;
+ }
+
+ public CategoryPrice(long category, long price, boolean isLast) {
+ this.category = category;
+ this.price = price;
+ this.isLast = isLast;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8 + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
new file mode 100644
index 0000000..659da44
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of query 10.
+ */
+public class Done implements KnownSize, Serializable {
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+ @Override
+ public void encode(Done value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ STRING_CODER.encode(value.message, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Done decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ String message = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Done(message);
+ }
+ };
+
+ @JsonProperty
+ public final String message;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ public Done() {
+ message = null;
+ }
+
+ public Done(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return message.length();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
new file mode 100644
index 0000000..a382b8e
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+/**
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
+ * or a {@link Bid}.
+ */
+public class Event implements KnownSize, Serializable {
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+ @Override
+ public void encode(Event value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value.newPerson != null) {
+ INT_CODER.encode(0, outStream, Context.NESTED);
+ Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+ } else if (value.newAuction != null) {
+ INT_CODER.encode(1, outStream, Context.NESTED);
+ Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+ } else if (value.bid != null) {
+ INT_CODER.encode(2, outStream, Context.NESTED);
+ Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public Event decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ int tag = INT_CODER.decode(inStream, context);
+ if (tag == 0) {
+ Person person = Person.CODER.decode(inStream, Context.NESTED);
+ return new Event(person);
+ } else if (tag == 1) {
+ Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ return new Event(auction);
+ } else if (tag == 2) {
+ Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ return new Event(bid);
+ } else {
+ throw new RuntimeException("invalid event encoding");
+ }
+ }
+ };
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Person newPerson;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Auction newAuction;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Bid bid;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Event() {
+ newPerson = null;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Person newPerson) {
+ this.newPerson = newPerson;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Auction newAuction) {
+ newPerson = null;
+ this.newAuction = newAuction;
+ bid = null;
+ }
+
+ public Event(Bid bid) {
+ newPerson = null;
+ newAuction = null;
+ this.bid = bid;
+ }
+
+ /**
+ * Return a copy of event which captures {@code annotation}.
+ * (Used for debugging).
+ */
+ public Event withAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withAnnotation(annotation));
+ } else {
+ return new Event(bid.withAnnotation(annotation));
+ }
+ }
+
+ /**
+ * Does event have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ if (newPerson != null) {
+ return newPerson.hasAnnotation(annotation);
+ } else if (newAuction != null) {
+ return newAuction.hasAnnotation(annotation);
+ } else {
+ return bid.hasAnnotation(annotation);
+ }
+ }
+
+ /**
+ * Remove {@code annotation} from event. (Used for debugging.)
+ */
+ public Event withoutAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withoutAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withoutAnnotation(annotation));
+ } else {
+ return new Event(bid.withoutAnnotation(annotation));
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ if (newPerson != null) {
+ return 1 + newPerson.sizeInBytes();
+ } else if (newAuction != null) {
+ return 1 + newAuction.sizeInBytes();
+ } else if (bid != null) {
+ return 1 + bid.sizeInBytes();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (newPerson != null) {
+ return newPerson.toString();
+ } else if (newAuction != null) {
+ return newAuction.toString();
+ } else if (bid != null) {
+ return bid.toString();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+}