You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:24 UTC

[16/55] [abbrv] beam git commit: Refactor classes into packages

Refactor classes into packages

The new hierarchy has logically based packages for:
- drivers
- io
- model
- queries
- sources


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7f9f7d0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7f9f7d0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7f9f7d0

Branch: refs/heads/master
Commit: a7f9f7d0784d9ba1f53ac4a0b49d2d81700720d0
Parents: 9ce9bf0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Thu Mar 23 19:32:45 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 .../java/nexmark/BEAM_ON_FLINK_ON_GCP.md        |   2 +-
 integration/java/nexmark/README.md              |  88 +--
 integration/java/nexmark/pom.xml                |  22 +-
 .../integration/nexmark/AbstractSimulator.java  |   2 +-
 .../beam/integration/nexmark/Auction.java       | 189 ------
 .../beam/integration/nexmark/AuctionBid.java    |  86 ---
 .../beam/integration/nexmark/AuctionCount.java  |  89 ---
 .../beam/integration/nexmark/AuctionPrice.java  |  90 ---
 .../apache/beam/integration/nexmark/Bid.java    | 177 ------
 .../integration/nexmark/BidsPerSession.java     |  88 ---
 .../integration/nexmark/BoundedEventSource.java | 189 ------
 .../beam/integration/nexmark/CategoryPrice.java |  99 ----
 .../apache/beam/integration/nexmark/Done.java   |  82 ---
 .../apache/beam/integration/nexmark/Event.java  | 179 ------
 .../beam/integration/nexmark/Generator.java     | 589 ------------------
 .../integration/nexmark/GeneratorConfig.java    | 294 ---------
 .../beam/integration/nexmark/IdNameReserve.java |  99 ----
 .../beam/integration/nexmark/KnownSize.java     |  26 -
 .../beam/integration/nexmark/Monitor.java       |   1 +
 .../integration/nexmark/NameCityStateId.java    | 105 ----
 .../integration/nexmark/NexmarkApexDriver.java  |  48 --
 .../integration/nexmark/NexmarkApexRunner.java  |  61 --
 .../nexmark/NexmarkConfiguration.java           |   6 +-
 .../nexmark/NexmarkDirectDriver.java            |  47 --
 .../nexmark/NexmarkDirectRunner.java            |  58 --
 .../beam/integration/nexmark/NexmarkDriver.java |   5 +-
 .../integration/nexmark/NexmarkFlinkDriver.java |  48 --
 .../integration/nexmark/NexmarkFlinkRunner.java |  53 --
 .../nexmark/NexmarkGoogleDriver.java            |  62 --
 .../nexmark/NexmarkGoogleRunner.java            | 159 -----
 .../integration/nexmark/NexmarkOptions.java     | 386 ++++++++++++
 .../beam/integration/nexmark/NexmarkPerf.java   |   4 +-
 .../beam/integration/nexmark/NexmarkQuery.java  |   7 +-
 .../integration/nexmark/NexmarkQueryModel.java  |  19 +-
 .../beam/integration/nexmark/NexmarkRunner.java |  40 +-
 .../integration/nexmark/NexmarkSparkDriver.java |  46 --
 .../integration/nexmark/NexmarkSparkRunner.java |  54 --
 .../beam/integration/nexmark/NexmarkSuite.java  |   2 +-
 .../beam/integration/nexmark/NexmarkUtils.java  |  25 +-
 .../beam/integration/nexmark/Options.java       | 386 ------------
 .../apache/beam/integration/nexmark/Person.java | 165 ------
 .../beam/integration/nexmark/PubsubHelper.java  | 216 -------
 .../apache/beam/integration/nexmark/Query0.java |  67 ---
 .../beam/integration/nexmark/Query0Model.java   |  62 --
 .../apache/beam/integration/nexmark/Query1.java |  62 --
 .../beam/integration/nexmark/Query10.java       | 380 ------------
 .../beam/integration/nexmark/Query11.java       |  73 ---
 .../beam/integration/nexmark/Query12.java       |  77 ---
 .../beam/integration/nexmark/Query1Model.java   |  73 ---
 .../apache/beam/integration/nexmark/Query2.java |  73 ---
 .../beam/integration/nexmark/Query2Model.java   |  75 ---
 .../apache/beam/integration/nexmark/Query3.java | 249 --------
 .../beam/integration/nexmark/Query3Model.java   | 118 ----
 .../apache/beam/integration/nexmark/Query4.java | 107 ----
 .../beam/integration/nexmark/Query4Model.java   | 179 ------
 .../apache/beam/integration/nexmark/Query5.java | 123 ----
 .../beam/integration/nexmark/Query5Model.java   | 172 ------
 .../apache/beam/integration/nexmark/Query6.java | 151 -----
 .../beam/integration/nexmark/Query6Model.java   | 126 ----
 .../apache/beam/integration/nexmark/Query7.java |  85 ---
 .../beam/integration/nexmark/Query7Model.java   | 127 ----
 .../apache/beam/integration/nexmark/Query8.java |  91 ---
 .../beam/integration/nexmark/Query8Model.java   | 144 -----
 .../apache/beam/integration/nexmark/Query9.java |  39 --
 .../beam/integration/nexmark/Query9Model.java   |  43 --
 .../beam/integration/nexmark/SellerPrice.java   |  90 ---
 .../nexmark/UnboundedEventSource.java           | 328 ----------
 .../beam/integration/nexmark/WinningBids.java   |  11 +-
 .../nexmark/WinningBidsSimulator.java           |   4 +
 .../nexmark/drivers/NexmarkApexDriver.java      |  50 ++
 .../nexmark/drivers/NexmarkApexRunner.java      |  65 ++
 .../nexmark/drivers/NexmarkDirectDriver.java    |  49 ++
 .../nexmark/drivers/NexmarkDirectRunner.java    |  60 ++
 .../nexmark/drivers/NexmarkFlinkDriver.java     |  50 ++
 .../nexmark/drivers/NexmarkFlinkRunner.java     |  55 ++
 .../nexmark/drivers/NexmarkGoogleDriver.java    |  67 +++
 .../nexmark/drivers/NexmarkGoogleRunner.java    | 163 +++++
 .../nexmark/drivers/NexmarkSparkDriver.java     |  48 ++
 .../nexmark/drivers/NexmarkSparkRunner.java     |  56 ++
 .../nexmark/drivers/package-info.java           |  22 +
 .../integration/nexmark/io/PubsubHelper.java    | 217 +++++++
 .../integration/nexmark/io/package-info.java    |  22 +
 .../beam/integration/nexmark/model/Auction.java | 190 ++++++
 .../integration/nexmark/model/AuctionBid.java   |  88 +++
 .../integration/nexmark/model/AuctionCount.java |  90 +++
 .../integration/nexmark/model/AuctionPrice.java |  91 +++
 .../beam/integration/nexmark/model/Bid.java     | 178 ++++++
 .../nexmark/model/BidsPerSession.java           |  89 +++
 .../nexmark/model/CategoryPrice.java            | 100 ++++
 .../beam/integration/nexmark/model/Done.java    |  83 +++
 .../beam/integration/nexmark/model/Event.java   | 179 ++++++
 .../nexmark/model/IdNameReserve.java            | 100 ++++
 .../integration/nexmark/model/KnownSize.java    |  26 +
 .../nexmark/model/NameCityStateId.java          | 106 ++++
 .../beam/integration/nexmark/model/Person.java  | 166 ++++++
 .../integration/nexmark/model/SellerPrice.java  |  91 +++
 .../integration/nexmark/model/package-info.java |  22 +
 .../beam/integration/nexmark/package-info.java  |   2 +-
 .../integration/nexmark/queries/Query0.java     |  72 +++
 .../nexmark/queries/Query0Model.java            |  67 +++
 .../integration/nexmark/queries/Query1.java     |  68 +++
 .../integration/nexmark/queries/Query10.java    | 384 ++++++++++++
 .../integration/nexmark/queries/Query11.java    |  80 +++
 .../integration/nexmark/queries/Query12.java    |  84 +++
 .../nexmark/queries/Query1Model.java            |  79 +++
 .../integration/nexmark/queries/Query2.java     |  80 +++
 .../nexmark/queries/Query2Model.java            |  82 +++
 .../integration/nexmark/queries/Query3.java     | 256 ++++++++
 .../nexmark/queries/Query3Model.java            | 126 ++++
 .../integration/nexmark/queries/Query4.java     | 118 ++++
 .../nexmark/queries/Query4Model.java            | 188 ++++++
 .../integration/nexmark/queries/Query5.java     | 129 ++++
 .../nexmark/queries/Query5Model.java            | 178 ++++++
 .../integration/nexmark/queries/Query6.java     | 159 +++++
 .../nexmark/queries/Query6Model.java            | 135 +++++
 .../integration/nexmark/queries/Query7.java     |  91 +++
 .../nexmark/queries/Query7Model.java            | 133 +++++
 .../integration/nexmark/queries/Query8.java     |  98 +++
 .../nexmark/queries/Query8Model.java            | 150 +++++
 .../integration/nexmark/queries/Query9.java     |  46 ++
 .../nexmark/queries/Query9Model.java            |  47 ++
 .../nexmark/queries/package-info.java           |  22 +
 .../nexmark/sources/BoundedEventSource.java     | 190 ++++++
 .../integration/nexmark/sources/Generator.java  | 593 +++++++++++++++++++
 .../nexmark/sources/GeneratorConfig.java        | 296 +++++++++
 .../nexmark/sources/UnboundedEventSource.java   | 330 +++++++++++
 .../nexmark/sources/package-info.java           |  22 +
 .../nexmark/src/main/resources/log4j.properties |   4 +
 .../nexmark/BoundedEventSourceTest.java         |  70 ---
 .../beam/integration/nexmark/GeneratorTest.java | 110 ----
 .../beam/integration/nexmark/QueryTest.java     | 107 ----
 .../nexmark/UnboundedEventSourceTest.java       | 108 ----
 .../integration/nexmark/queries/QueryTest.java  | 111 ++++
 .../nexmark/sources/BoundedEventSourceTest.java |  71 +++
 .../nexmark/sources/GeneratorTest.java          | 111 ++++
 .../sources/UnboundedEventSourceTest.java       | 110 ++++
 136 files changed, 7768 insertions(+), 7384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
index d1b51e8..6a7fd34 100644
--- a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
+++ b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
@@ -243,7 +243,7 @@ $GCLOUD compute ssh \
   --zone=$ZONE \
   $MASTER \
   --command "~/$FLINK_VER/bin/flink run \
-  -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \
+  -c org.apache.beam.integration.nexmark.drivers.NexmarkFlinkDriver \
   ~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
   --project=$PROJECT \
   --streaming=true \

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
index 7a91ab2..a3549f4 100644
--- a/integration/java/nexmark/README.md
+++ b/integration/java/nexmark/README.md
@@ -74,14 +74,15 @@ We have augmented the original queries with five more:
 The queries can be executed using a 'Driver' for a given backend.
 Currently the supported drivers are:
 
+* **NexmarkApexDriver** for running via the Apex runner.
 * **NexmarkDirectDriver** for running locally on a single machine.
-* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow
-  service. Requires a Google Cloud account.
+* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow service.
+  Requires a Google Cloud account.
 * **NexmarkFlinkDriver** for running on a Flink cluster. Requires the
   cluster to be established and the Nexmark jar to be distributed to
   each worker.
 * **NexmarkSparkDriver** for running on a Spark cluster.
-  
+
 Other drivers are straightforward.
 
 Test data is deterministically synthesized on demand. The test
@@ -103,9 +104,21 @@ the Google Cloud Dataflow driver.
 
 # Configuration
 
-Common configuration parameters:
+## Common configuration parameters
+
+Decide if batch or streaming:
+
+    --streaming=true
+
+Number of events generators
+
+    --numEventGenerators=4
+
+Run query N
 
-Available Suites:
+    --query=N
+
+## Available Suites
 
 - DEFAULT: Test default configuration with query 0.
 - SMOKE: Run the 12 default configurations.
@@ -114,32 +127,39 @@ Available Suites:
 
         --suite=SMOKE
 
-Decide if batch or streaming:
-
-    --streaming=true
+### Apex specific configuration
 
-Number of events generators
+    --suite=SMOKE --manageResources=false --monitorJobs=true
 
-    --numEventGenerators=4
+### Dataflow specific configuration
 
-## Apex specific configuration
+    --query=0 --suite=SMOKE --manageResources=false --monitorJobs=true \
+    --enforceEncodability=false --enforceImmutability=false
+    --project=<your project> \
+    --zone=<your zone> \
+    --workerMachineType=n1-highmem-8 \
+    --stagingLocation=<a gs path for staging>
 
---suite=SMOKE --manageResources=false --monitorJobs=true
+    --runner=BlockingDataflowRunner \
+    --tempLocation=gs://talend-imejia/nexmark/temp/ \
+    --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
+    --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
 
-## Direct specific configuration
+### Direct specific configuration
 
---suite=SMOKE --manageResources=false --monitorJobs=true \
---enforceEncodability=false --enforceImmutability=false
+    --suite=SMOKE --manageResources=false --monitorJobs=true \
+    --enforceEncodability=false --enforceImmutability=false
 
-## Flink specific configuration
+### Flink specific configuration
 
---suite=SMOKE --manageResources=false --monitorJobs=true \
---flinkMaster=local
+    --suite=SMOKE --manageResources=false --monitorJobs=true \
+    --flinkMaster=[local] --parallelism=#numcores
 
-## Spark specific configuration
+### Spark specific configuration
 
---suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \
--Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
+    --suite=SMOKE --manageResources=false --monitorJobs=true \
+    --sparkMaster=local \
+    -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
 
 # Current Status
 
@@ -149,19 +169,19 @@ Open issues are tracked [here](https://github.com../../../../../issues):
 
 | Query | Direct                         | Spark                          | Flink                          | Apex                            |
 | ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- |
-|     0 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
-|     1 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
-|     2 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     0 | ok                             | ok                             | ok                             | ok                              |
+|     1 | ok                             | ok                             | ok                             | ok                              |
+|     2 | ok                             | ok                             | ok                             | ok                              |
 |     3 | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)   |
 |     4 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
-|     5 | ok                             | [#3](../../../../../issues/3)  | ok                             | ok                              |
+|     5 | ok                             | ok                             | ok                             | ok                              |
 |     6 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
-|     7 | ok                             | [#1](../../../../../issues/1)  | ok                             | [#24](../../../../../issues/24) |
-|     8 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     7 | ok                             | ok                             | ok                             | [#24](../../../../../issues/24) |
+|     8 | ok                             | ok                             | ok                             | ok                              |
 |     9 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
-|    10 | [#5](../../../../../issues/5)  | [#4](../../../../../issues/4)  | ok                             | ok                              |
-|    11 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
-|    12 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|    10 | [#5](../../../../../issues/5)  | ok                             | ok                             | ok                              |
+|    11 | ok                             | ok                             | ok                             | ok                              |
+|    12 | ok                             | ok                             | ok                             | ok                              |
 
 ## Streaming / Synthetic / Local
 
@@ -205,11 +225,11 @@ Batch Mode
 
 -Dexec.classpathScope="test"
 
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
 
 Streaming Mode
 
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
 
 ## Running on Google Cloud Dataflow
 
@@ -218,7 +238,7 @@ service.
 
 ```
 java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
-  org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+  org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
   --project=<your project> \
   --zone=<your zone> \
   --workerMachineType=n1-highmem-8 \
@@ -251,7 +271,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
 
 ```
 java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
-  org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+  org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
   --project=<your project> \
   --zone=<your zone> \
   --workerMachineType=n1-highmem-8 \

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 0ecc298..7cd7d39 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -61,11 +61,6 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
 
@@ -139,7 +134,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
-        <version>2.10</version>
         <executions>
           <execution>
             <goals><goal>analyze-only</goal></goals>
@@ -196,11 +190,13 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
       <version>${spark.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
       <version>${spark.version}</version>
+      <scope>runtime</scope>
     </dependency>
 
     <!-- Apex runner -->
@@ -215,12 +211,6 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>${jackson.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-mapper-asl</artifactId>
       <version>${apex.codehaus.jackson.version}</version>
@@ -244,6 +234,7 @@
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-dataflow</artifactId>
       <version>${dataflow.version}</version>
+      <scope>runtime</scope>
     </dependency>
 
     <dependency>
@@ -289,13 +280,6 @@
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
-      <version>${hamcrest.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-library</artifactId>
-      <version>${hamcrest.version}</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
index c08cdd3..b012842 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
@@ -34,7 +34,7 @@ import org.joda.time.Instant;
  * @param <InputT> Type of input elements.
  * @param <OutputT> Type of output elements.
  */
-abstract class AbstractSimulator<InputT, OutputT> {
+public abstract class AbstractSimulator<InputT, OutputT> {
   /** Window size for action bucket sampling. */
   public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
deleted file mode 100644
index 16c28aa..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * An auction submitted by a person.
- */
-public class Auction implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
-    @Override
-    public void encode(Auction value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
-      STRING_CODER.encode(value.description, outStream, Context.NESTED);
-      LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
-      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      LONG_CODER.encode(value.expires, outStream, Context.NESTED);
-      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
-      LONG_CODER.encode(value.category, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
-    }
-
-    @Override
-    public Auction decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String itemName = STRING_CODER.decode(inStream, Context.NESTED);
-      String description = STRING_CODER.decode(inStream, Context.NESTED);
-      long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
-      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      long expires = LONG_CODER.decode(inStream, Context.NESTED);
-      long seller = LONG_CODER.decode(inStream, Context.NESTED);
-      long category = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
-      return new Auction(
-          id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
-          extra);
-    }
-  };
-
-
-  /** Id of auction. */
-  @JsonProperty
-  public final long id; // primary key
-
-  /** Extra auction properties. */
-  @JsonProperty
-  public final String itemName;
-
-  @JsonProperty
-  public final String description;
-
-  /** Initial bid price, in cents. */
-  @JsonProperty
-  public final long initialBid;
-
-  /** Reserve price, in cents. */
-  @JsonProperty
-  public final long reserve;
-
-  @JsonProperty
-  public final long dateTime;
-
-  /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
-  @JsonProperty
-  public final long expires;
-
-  /** Id of person who instigated auction. */
-  @JsonProperty
-  public final long seller; // foreign key: Person.id
-
-  /** Id of category auction is listed under. */
-  @JsonProperty
-  public final long category; // foreign key: Category.id
-
-  /** Additional arbitrary payload for performance testing. */
-  @JsonProperty
-  public final String extra;
-
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private Auction() {
-    id = 0;
-    itemName = null;
-    description = null;
-    initialBid = 0;
-    reserve = 0;
-    dateTime = 0;
-    expires = 0;
-    seller = 0;
-    category = 0;
-    extra = null;
-  }
-
-  public Auction(long id, String itemName, String description, long initialBid, long reserve,
-      long dateTime, long expires, long seller, long category, String extra) {
-    this.id = id;
-    this.itemName = itemName;
-    this.description = description;
-    this.initialBid = initialBid;
-    this.reserve = reserve;
-    this.dateTime = dateTime;
-    this.expires = expires;
-    this.seller = seller;
-    this.category = category;
-    this.extra = extra;
-  }
-
-  /**
-   * Return a copy of auction which capture the given annotation.
-   * (Used for debugging).
-   */
-  public Auction withAnnotation(String annotation) {
-    return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
-        category, annotation + ": " + extra);
-  }
-
-  /**
-   * Does auction have {@code annotation}? (Used for debugging.)
-   */
-  public boolean hasAnnotation(String annotation) {
-    return extra.startsWith(annotation + ": ");
-  }
-
-  /**
-   * Remove {@code annotation} from auction. (Used for debugging.)
-   */
-  public Auction withoutAnnotation(String annotation) {
-    if (hasAnnotation(annotation)) {
-      return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
-          category, extra.substring(annotation.length() + 2));
-    } else {
-      return this;
-    }
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
-        + extra.length() + 1;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
deleted file mode 100644
index cd52b02..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-
-/**
- * Result of {@link WinningBids} transform.
- */
-public class AuctionBid implements KnownSize, Serializable {
-  public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
-    @Override
-    public void encode(AuctionBid value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      Auction.CODER.encode(value.auction, outStream, Context.NESTED);
-      Bid.CODER.encode(value.bid, outStream, Context.NESTED);
-    }
-
-    @Override
-    public AuctionBid decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
-      Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
-      return new AuctionBid(auction, bid);
-    }
-  };
-
-  @JsonProperty
-  public final Auction auction;
-
-  @JsonProperty
-  public final Bid bid;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private AuctionBid() {
-    auction = null;
-    bid = null;
-  }
-
-  public AuctionBid(Auction auction, Bid bid) {
-    this.auction = auction;
-    this.bid = bid;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return auction.sizeInBytes() + bid.sizeInBytes();
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
deleted file mode 100644
index ac1f080..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query5}.
- */
-public class AuctionCount implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-  public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
-    @Override
-    public void encode(AuctionCount value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.count, outStream, Context.NESTED);
-    }
-
-    @Override
-    public AuctionCount decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long count = LONG_CODER.decode(inStream, Context.NESTED);
-      return new AuctionCount(auction, count);
-    }
-  };
-
-  @JsonProperty
-  public final long auction;
-
-  @JsonProperty
-  public final long count;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private AuctionCount() {
-    auction = 0;
-    count = 0;
-  }
-
-  public AuctionCount(long auction, long count) {
-    this.auction = auction;
-    this.count = count;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + 8;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
deleted file mode 100644
index 9bdf11c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query2}.
- */
-public class AuctionPrice implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-  public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
-    @Override
-    public void encode(AuctionPrice value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
-    }
-
-    @Override
-    public AuctionPrice decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
-      return new AuctionPrice(auction, price);
-    }
-  };
-
-  @JsonProperty
-  public final long auction;
-
-  /** Price in cents. */
-  @JsonProperty
-  public final long price;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private AuctionPrice() {
-    auction = 0;
-    price = 0;
-  }
-
-  public AuctionPrice(long auction, long price) {
-    this.auction = auction;
-    this.price = price;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + 8;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
deleted file mode 100644
index 04fcfdd..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Comparator;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A bid for an item on auction.
- */
-public class Bid implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
-    @Override
-    public void encode(Bid value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
-    }
-
-    @Override
-    public Bid decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long bidder = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
-      return new Bid(auction, bidder, price, dateTime, extra);
-    }
-  };
-
-  /**
-   * Comparator to order bids by ascending price then descending time
-   * (for finding winning bids).
-   */
-  public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
-    @Override
-    public int compare(Bid left, Bid right) {
-      int i = Double.compare(left.price, right.price);
-      if (i != 0) {
-        return i;
-      }
-      return Long.compare(right.dateTime, left.dateTime);
-    }
-  };
-
-  /**
-   * Comparator to order bids by ascending time then ascending price.
-   * (for finding most recent bids).
-   */
-  public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
-    @Override
-    public int compare(Bid left, Bid right) {
-      int i = Long.compare(left.dateTime, right.dateTime);
-      if (i != 0) {
-        return i;
-      }
-      return Double.compare(left.price, right.price);
-    }
-  };
-
-  /** Id of auction this bid is for. */
-  @JsonProperty
-  public final long auction; // foreign key: Auction.id
-
-  /** Id of person bidding in auction. */
-  @JsonProperty
-  public final long bidder; // foreign key: Person.id
-
-  /** Price of bid, in cents. */
-  @JsonProperty
-  public final long price;
-
-  /**
-   * Instant at which bid was made (ms since epoch).
-   * NOTE: This may be earlier than the system's event time.
-   */
-  @JsonProperty
-  public final long dateTime;
-
-  /** Additional arbitrary payload for performance testing. */
-  @JsonProperty
-  public final String extra;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private Bid() {
-    auction = 0;
-    bidder = 0;
-    price = 0;
-    dateTime = 0;
-    extra = null;
-  }
-
-  public Bid(long auction, long bidder, long price, long dateTime, String extra) {
-    this.auction = auction;
-    this.bidder = bidder;
-    this.price = price;
-    this.dateTime = dateTime;
-    this.extra = extra;
-  }
-
-  /**
-   * Return a copy of bid which capture the given annotation.
-   * (Used for debugging).
-   */
-  public Bid withAnnotation(String annotation) {
-    return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
-  }
-
-  /**
-   * Does bid have {@code annotation}? (Used for debugging.)
-   */
-  public boolean hasAnnotation(String annotation) {
-    return extra.startsWith(annotation + ": ");
-  }
-
-  /**
-   * Remove {@code annotation} from bid. (Used for debugging.)
-   */
-  public Bid withoutAnnotation(String annotation) {
-    if (hasAnnotation(annotation)) {
-      return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
-    } else {
-      return this;
-    }
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + 8 + 8 + 8 + extra.length() + 1;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
deleted file mode 100644
index c6b0fe3..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of query 11.
- */
-public class BidsPerSession implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
-  public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
-    @Override
-    public void encode(BidsPerSession value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.personId, outStream, Context.NESTED);
-      LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
-    }
-
-    @Override
-    public BidsPerSession decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long personId = LONG_CODER.decode(inStream, Context.NESTED);
-      long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
-      return new BidsPerSession(personId, bidsPerSession);
-    }
-  };
-
-  @JsonProperty
-  public final long personId;
-
-  @JsonProperty
-  public final long bidsPerSession;
-
-  public BidsPerSession() {
-    personId = 0;
-    bidsPerSession = 0;
-  }
-
-  public BidsPerSession(long personId, long bidsPerSession) {
-    this.personId = personId;
-    this.bidsPerSession = bidsPerSession;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    // Two longs.
-    return 8 + 8;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
deleted file mode 100644
index 7dc1bcc..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A custom, bounded source of event records.
- */
-class BoundedEventSource extends BoundedSource<Event> {
-  /** Configuration we generate events against. */
-  private final GeneratorConfig config;
-
-  /** How many bounded sources to create. */
-  private final int numEventGenerators;
-
-  public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
-    this.config = config;
-    this.numEventGenerators = numEventGenerators;
-  }
-
-  /** A reader to pull events from the generator. */
-  private static class EventReader extends BoundedReader<Event> {
-    /**
-     * Event source we purporting to be reading from.
-     * (We can't use Java's capture-outer-class pointer since we must update
-     * this field on calls to splitAtFraction.)
-     */
-    private BoundedEventSource source;
-
-    /** Generator we are reading from. */
-    private final Generator generator;
-
-    private boolean reportedStop;
-
-    @Nullable
-    private TimestampedValue<Event> currentEvent;
-
-    public EventReader(BoundedEventSource source, GeneratorConfig config) {
-      this.source = source;
-      generator = new Generator(config);
-      reportedStop = false;
-    }
-
-    @Override
-    public synchronized boolean start() {
-      NexmarkUtils.info("starting bounded generator %s", generator);
-      return advance();
-    }
-
-    @Override
-    public synchronized boolean advance() {
-      if (!generator.hasNext()) {
-        // No more events.
-        if (!reportedStop) {
-          reportedStop = true;
-          NexmarkUtils.info("stopped bounded generator %s", generator);
-        }
-        return false;
-      }
-      currentEvent = generator.next();
-      return true;
-    }
-
-    @Override
-    public synchronized Event getCurrent() throws NoSuchElementException {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getValue();
-    }
-
-    @Override
-    public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getTimestamp();
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Nothing to close.
-    }
-
-    @Override
-    public synchronized Double getFractionConsumed() {
-      return generator.getFractionConsumed();
-    }
-
-    @Override
-    public synchronized BoundedSource<Event> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    @Nullable
-    public synchronized BoundedEventSource splitAtFraction(double fraction) {
-      long startId = generator.getCurrentConfig().getStartEventId();
-      long stopId = generator.getCurrentConfig().getStopEventId();
-      long size = stopId - startId;
-      long splitEventId = startId + Math.min((int) (size * fraction), size);
-      if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
-        // Already passed this position or split results in left or right being empty.
-        NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
-        return null;
-      }
-
-      NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
-
-      // Scale back the event space of the current generator, and return a generator config
-      // representing the event space we just 'stole' from the current generator.
-      GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
-
-      NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
-
-      // At this point
-      //   generator.events() ++ new Generator(remainingConfig).events()
-      //   == originalGenerator.events()
-
-      // We need a new source to represent the now smaller key space for this reader, so
-      // that we can maintain the invariant that
-      //   this.getCurrentSource().createReader(...)
-      // will yield the same output as this.
-      source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
-
-      // Return a source from which we may read the 'stolen' event space.
-      return new BoundedEventSource(remainingConfig, source.numEventGenerators);
-    }
-  }
-
-  @Override
-  public List<BoundedEventSource> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) {
-    NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
-    List<BoundedEventSource> results = new ArrayList<>();
-    // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
-    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
-      results.add(new BoundedEventSource(subConfig, 1));
-    }
-    return results;
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    return config.getEstimatedSizeBytes();
-  }
-
-  @Override
-  public EventReader createReader(PipelineOptions options) {
-    NexmarkUtils.info("creating initial bounded reader for %s", config);
-    return new EventReader(this, config);
-  }
-
-  @Override
-  public void validate() {
-    // Nothing to validate.
-  }
-
-  @Override
-  public Coder<Event> getDefaultOutputCoder() {
-    return Event.CODER;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
deleted file mode 100644
index c83fb17..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query4}.
- */
-public class CategoryPrice implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
-  public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
-    @Override
-    public void encode(CategoryPrice value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.category, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
-      INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
-    }
-
-    @Override
-    public CategoryPrice decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long category = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
-      boolean isLast = INT_CODER.decode(inStream, context) != 0;
-      return new CategoryPrice(category, price, isLast);
-    }
-  };
-
-  @JsonProperty
-  public final long category;
-
-  /** Price in cents. */
-  @JsonProperty
-  public final long price;
-
-  @JsonProperty
-  public final boolean isLast;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private CategoryPrice() {
-    category = 0;
-    price = 0;
-    isLast = false;
-  }
-
-  public CategoryPrice(long category, long price, boolean isLast) {
-    this.category = category;
-    this.price = price;
-    this.isLast = isLast;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + 8 + 1;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
deleted file mode 100644
index 3a045f9..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * Result of query 10.
- */
-public class Done implements KnownSize, Serializable {
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  public static final Coder<Done> CODER = new AtomicCoder<Done>() {
-    @Override
-    public void encode(Done value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      STRING_CODER.encode(value.message, outStream, Context.NESTED);
-    }
-
-    @Override
-    public Done decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      String message = STRING_CODER.decode(inStream, Context.NESTED);
-      return new Done(message);
-    }
-  };
-
-  @JsonProperty
-  public final String message;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  public Done() {
-    message = null;
-  }
-
-  public Done(String message) {
-    this.message = message;
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return message.length();
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
deleted file mode 100644
index 769cedd..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarIntCoder;
-
-/**
- * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
- * or a {@link Bid}.
- */
-public class Event implements KnownSize, Serializable {
-  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
-  public static final Coder<Event> CODER = new AtomicCoder<Event>() {
-    @Override
-    public void encode(Event value, OutputStream outStream, Coder.Context context)
-        throws CoderException, IOException {
-      if (value.newPerson != null) {
-        INT_CODER.encode(0, outStream, Context.NESTED);
-        Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
-      } else if (value.newAuction != null) {
-        INT_CODER.encode(1, outStream, Context.NESTED);
-        Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
-      } else if (value.bid != null) {
-        INT_CODER.encode(2, outStream, Context.NESTED);
-        Bid.CODER.encode(value.bid, outStream, Context.NESTED);
-      } else {
-        throw new RuntimeException("invalid event");
-      }
-    }
-
-    @Override
-    public Event decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      int tag = INT_CODER.decode(inStream, context);
-      if (tag == 0) {
-        Person person = Person.CODER.decode(inStream, Context.NESTED);
-        return new Event(person);
-      } else if (tag == 1) {
-        Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
-        return new Event(auction);
-      } else if (tag == 2) {
-        Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
-        return new Event(bid);
-      } else {
-        throw new RuntimeException("invalid event encoding");
-      }
-    }
-  };
-
-  @Nullable
-  @org.apache.avro.reflect.Nullable
-  public final Person newPerson;
-
-  @Nullable
-  @org.apache.avro.reflect.Nullable
-  public final Auction newAuction;
-
-  @Nullable
-  @org.apache.avro.reflect.Nullable
-  public final Bid bid;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private Event() {
-    newPerson = null;
-    newAuction = null;
-    bid = null;
-  }
-
-  public Event(Person newPerson) {
-    this.newPerson = newPerson;
-    newAuction = null;
-    bid = null;
-  }
-
-  public Event(Auction newAuction) {
-    newPerson = null;
-    this.newAuction = newAuction;
-    bid = null;
-  }
-
-  public Event(Bid bid) {
-    newPerson = null;
-    newAuction = null;
-    this.bid = bid;
-  }
-
-  /**
-   * Return a copy of event which captures {@code annotation}.
-   * (Used for debugging).
-   */
-  public Event withAnnotation(String annotation) {
-    if (newPerson != null) {
-      return new Event(newPerson.withAnnotation(annotation));
-    } else if (newAuction != null) {
-      return new Event(newAuction.withAnnotation(annotation));
-    } else {
-      return new Event(bid.withAnnotation(annotation));
-    }
-  }
-
-  /**
-   * Does event have {@code annotation}? (Used for debugging.)
-   */
-  public boolean hasAnnotation(String annotation) {
-    if (newPerson != null) {
-      return newPerson.hasAnnotation(annotation);
-    } else if (newAuction != null) {
-      return newAuction.hasAnnotation(annotation);
-    } else {
-      return bid.hasAnnotation(annotation);
-    }
-  }
-
-  /**
-   * Remove {@code annotation} from event. (Used for debugging.)
-   */
-  public Event withoutAnnotation(String annotation) {
-    if (newPerson != null) {
-      return new Event(newPerson.withoutAnnotation(annotation));
-    } else if (newAuction != null) {
-      return new Event(newAuction.withoutAnnotation(annotation));
-    } else {
-      return new Event(bid.withoutAnnotation(annotation));
-    }
-  }
-
-  @Override
-  public long sizeInBytes() {
-    if (newPerson != null) {
-      return 1 + newPerson.sizeInBytes();
-    } else if (newAuction != null) {
-      return 1 + newAuction.sizeInBytes();
-    } else if (bid != null) {
-      return 1 + bid.sizeInBytes();
-    } else {
-      throw new RuntimeException("invalid event");
-    }
-  }
-
-  @Override
-  public String toString() {
-    if (newPerson != null) {
-      return newPerson.toString();
-    } else if (newAuction != null) {
-      return newAuction.toString();
-    } else if (bid != null) {
-      return bid.toString();
-    } else {
-      throw new RuntimeException("invalid event");
-    }
-  }
-}