You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:24 UTC
[16/55] [abbrv] beam git commit: Refactor classes into packages
Refactor classes into packages
The new hierarchy has logically based packages for:
- drivers
- io
- model
- queries
- sources
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7f9f7d0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7f9f7d0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7f9f7d0
Branch: refs/heads/master
Commit: a7f9f7d0784d9ba1f53ac4a0b49d2d81700720d0
Parents: 9ce9bf0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Thu Mar 23 19:32:45 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
.../java/nexmark/BEAM_ON_FLINK_ON_GCP.md | 2 +-
integration/java/nexmark/README.md | 88 +--
integration/java/nexmark/pom.xml | 22 +-
.../integration/nexmark/AbstractSimulator.java | 2 +-
.../beam/integration/nexmark/Auction.java | 189 ------
.../beam/integration/nexmark/AuctionBid.java | 86 ---
.../beam/integration/nexmark/AuctionCount.java | 89 ---
.../beam/integration/nexmark/AuctionPrice.java | 90 ---
.../apache/beam/integration/nexmark/Bid.java | 177 ------
.../integration/nexmark/BidsPerSession.java | 88 ---
.../integration/nexmark/BoundedEventSource.java | 189 ------
.../beam/integration/nexmark/CategoryPrice.java | 99 ----
.../apache/beam/integration/nexmark/Done.java | 82 ---
.../apache/beam/integration/nexmark/Event.java | 179 ------
.../beam/integration/nexmark/Generator.java | 589 ------------------
.../integration/nexmark/GeneratorConfig.java | 294 ---------
.../beam/integration/nexmark/IdNameReserve.java | 99 ----
.../beam/integration/nexmark/KnownSize.java | 26 -
.../beam/integration/nexmark/Monitor.java | 1 +
.../integration/nexmark/NameCityStateId.java | 105 ----
.../integration/nexmark/NexmarkApexDriver.java | 48 --
.../integration/nexmark/NexmarkApexRunner.java | 61 --
.../nexmark/NexmarkConfiguration.java | 6 +-
.../nexmark/NexmarkDirectDriver.java | 47 --
.../nexmark/NexmarkDirectRunner.java | 58 --
.../beam/integration/nexmark/NexmarkDriver.java | 5 +-
.../integration/nexmark/NexmarkFlinkDriver.java | 48 --
.../integration/nexmark/NexmarkFlinkRunner.java | 53 --
.../nexmark/NexmarkGoogleDriver.java | 62 --
.../nexmark/NexmarkGoogleRunner.java | 159 -----
.../integration/nexmark/NexmarkOptions.java | 386 ++++++++++++
.../beam/integration/nexmark/NexmarkPerf.java | 4 +-
.../beam/integration/nexmark/NexmarkQuery.java | 7 +-
.../integration/nexmark/NexmarkQueryModel.java | 19 +-
.../beam/integration/nexmark/NexmarkRunner.java | 40 +-
.../integration/nexmark/NexmarkSparkDriver.java | 46 --
.../integration/nexmark/NexmarkSparkRunner.java | 54 --
.../beam/integration/nexmark/NexmarkSuite.java | 2 +-
.../beam/integration/nexmark/NexmarkUtils.java | 25 +-
.../beam/integration/nexmark/Options.java | 386 ------------
.../apache/beam/integration/nexmark/Person.java | 165 ------
.../beam/integration/nexmark/PubsubHelper.java | 216 -------
.../apache/beam/integration/nexmark/Query0.java | 67 ---
.../beam/integration/nexmark/Query0Model.java | 62 --
.../apache/beam/integration/nexmark/Query1.java | 62 --
.../beam/integration/nexmark/Query10.java | 380 ------------
.../beam/integration/nexmark/Query11.java | 73 ---
.../beam/integration/nexmark/Query12.java | 77 ---
.../beam/integration/nexmark/Query1Model.java | 73 ---
.../apache/beam/integration/nexmark/Query2.java | 73 ---
.../beam/integration/nexmark/Query2Model.java | 75 ---
.../apache/beam/integration/nexmark/Query3.java | 249 --------
.../beam/integration/nexmark/Query3Model.java | 118 ----
.../apache/beam/integration/nexmark/Query4.java | 107 ----
.../beam/integration/nexmark/Query4Model.java | 179 ------
.../apache/beam/integration/nexmark/Query5.java | 123 ----
.../beam/integration/nexmark/Query5Model.java | 172 ------
.../apache/beam/integration/nexmark/Query6.java | 151 -----
.../beam/integration/nexmark/Query6Model.java | 126 ----
.../apache/beam/integration/nexmark/Query7.java | 85 ---
.../beam/integration/nexmark/Query7Model.java | 127 ----
.../apache/beam/integration/nexmark/Query8.java | 91 ---
.../beam/integration/nexmark/Query8Model.java | 144 -----
.../apache/beam/integration/nexmark/Query9.java | 39 --
.../beam/integration/nexmark/Query9Model.java | 43 --
.../beam/integration/nexmark/SellerPrice.java | 90 ---
.../nexmark/UnboundedEventSource.java | 328 ----------
.../beam/integration/nexmark/WinningBids.java | 11 +-
.../nexmark/WinningBidsSimulator.java | 4 +
.../nexmark/drivers/NexmarkApexDriver.java | 50 ++
.../nexmark/drivers/NexmarkApexRunner.java | 65 ++
.../nexmark/drivers/NexmarkDirectDriver.java | 49 ++
.../nexmark/drivers/NexmarkDirectRunner.java | 60 ++
.../nexmark/drivers/NexmarkFlinkDriver.java | 50 ++
.../nexmark/drivers/NexmarkFlinkRunner.java | 55 ++
.../nexmark/drivers/NexmarkGoogleDriver.java | 67 +++
.../nexmark/drivers/NexmarkGoogleRunner.java | 163 +++++
.../nexmark/drivers/NexmarkSparkDriver.java | 48 ++
.../nexmark/drivers/NexmarkSparkRunner.java | 56 ++
.../nexmark/drivers/package-info.java | 22 +
.../integration/nexmark/io/PubsubHelper.java | 217 +++++++
.../integration/nexmark/io/package-info.java | 22 +
.../beam/integration/nexmark/model/Auction.java | 190 ++++++
.../integration/nexmark/model/AuctionBid.java | 88 +++
.../integration/nexmark/model/AuctionCount.java | 90 +++
.../integration/nexmark/model/AuctionPrice.java | 91 +++
.../beam/integration/nexmark/model/Bid.java | 178 ++++++
.../nexmark/model/BidsPerSession.java | 89 +++
.../nexmark/model/CategoryPrice.java | 100 ++++
.../beam/integration/nexmark/model/Done.java | 83 +++
.../beam/integration/nexmark/model/Event.java | 179 ++++++
.../nexmark/model/IdNameReserve.java | 100 ++++
.../integration/nexmark/model/KnownSize.java | 26 +
.../nexmark/model/NameCityStateId.java | 106 ++++
.../beam/integration/nexmark/model/Person.java | 166 ++++++
.../integration/nexmark/model/SellerPrice.java | 91 +++
.../integration/nexmark/model/package-info.java | 22 +
.../beam/integration/nexmark/package-info.java | 2 +-
.../integration/nexmark/queries/Query0.java | 72 +++
.../nexmark/queries/Query0Model.java | 67 +++
.../integration/nexmark/queries/Query1.java | 68 +++
.../integration/nexmark/queries/Query10.java | 384 ++++++++++++
.../integration/nexmark/queries/Query11.java | 80 +++
.../integration/nexmark/queries/Query12.java | 84 +++
.../nexmark/queries/Query1Model.java | 79 +++
.../integration/nexmark/queries/Query2.java | 80 +++
.../nexmark/queries/Query2Model.java | 82 +++
.../integration/nexmark/queries/Query3.java | 256 ++++++++
.../nexmark/queries/Query3Model.java | 126 ++++
.../integration/nexmark/queries/Query4.java | 118 ++++
.../nexmark/queries/Query4Model.java | 188 ++++++
.../integration/nexmark/queries/Query5.java | 129 ++++
.../nexmark/queries/Query5Model.java | 178 ++++++
.../integration/nexmark/queries/Query6.java | 159 +++++
.../nexmark/queries/Query6Model.java | 135 +++++
.../integration/nexmark/queries/Query7.java | 91 +++
.../nexmark/queries/Query7Model.java | 133 +++++
.../integration/nexmark/queries/Query8.java | 98 +++
.../nexmark/queries/Query8Model.java | 150 +++++
.../integration/nexmark/queries/Query9.java | 46 ++
.../nexmark/queries/Query9Model.java | 47 ++
.../nexmark/queries/package-info.java | 22 +
.../nexmark/sources/BoundedEventSource.java | 190 ++++++
.../integration/nexmark/sources/Generator.java | 593 +++++++++++++++++++
.../nexmark/sources/GeneratorConfig.java | 296 +++++++++
.../nexmark/sources/UnboundedEventSource.java | 330 +++++++++++
.../nexmark/sources/package-info.java | 22 +
.../nexmark/src/main/resources/log4j.properties | 4 +
.../nexmark/BoundedEventSourceTest.java | 70 ---
.../beam/integration/nexmark/GeneratorTest.java | 110 ----
.../beam/integration/nexmark/QueryTest.java | 107 ----
.../nexmark/UnboundedEventSourceTest.java | 108 ----
.../integration/nexmark/queries/QueryTest.java | 111 ++++
.../nexmark/sources/BoundedEventSourceTest.java | 71 +++
.../nexmark/sources/GeneratorTest.java | 111 ++++
.../sources/UnboundedEventSourceTest.java | 110 ++++
136 files changed, 7768 insertions(+), 7384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
index d1b51e8..6a7fd34 100644
--- a/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
+++ b/integration/java/nexmark/BEAM_ON_FLINK_ON_GCP.md
@@ -243,7 +243,7 @@ $GCLOUD compute ssh \
--zone=$ZONE \
$MASTER \
--command "~/$FLINK_VER/bin/flink run \
- -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \
+ -c org.apache.beam.integration.nexmark.drivers.NexmarkFlinkDriver \
~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
--project=$PROJECT \
--streaming=true \
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
index 7a91ab2..a3549f4 100644
--- a/integration/java/nexmark/README.md
+++ b/integration/java/nexmark/README.md
@@ -74,14 +74,15 @@ We have augmented the original queries with five more:
The queries can be executed using a 'Driver' for a given backend.
Currently the supported drivers are:
+* **NexmarkApexDriver** for running via the Apex runner.
* **NexmarkDirectDriver** for running locally on a single machine.
-* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow
- service. Requires a Google Cloud account.
+* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow service.
+ Requires a Google Cloud account.
* **NexmarkFlinkDriver** for running on a Flink cluster. Requires the
cluster to be established and the Nexmark jar to be distributed to
each worker.
* **NexmarkSparkDriver** for running on a Spark cluster.
-
+
Other drivers are straightforward.
Test data is deterministically synthesized on demand. The test
@@ -103,9 +104,21 @@ the Google Cloud Dataflow driver.
# Configuration
-Common configuration parameters:
+## Common configuration parameters
+
+Decide if batch or streaming:
+
+ --streaming=true
+
+Number of events generators
+
+ --numEventGenerators=4
+
+Run query N
-Available Suites:
+ --query=N
+
+## Available Suites
- DEFAULT: Test default configuration with query 0.
- SMOKE: Run the 12 default configurations.
@@ -114,32 +127,39 @@ Available Suites:
--suite=SMOKE
-Decide if batch or streaming:
-
- --streaming=true
+### Apex specific configuration
-Number of events generators
+ --suite=SMOKE --manageResources=false --monitorJobs=true
- --numEventGenerators=4
+### Dataflow specific configuration
-## Apex specific configuration
+ --query=0 --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --enforceEncodability=false --enforceImmutability=false
+ --project=<your project> \
+ --zone=<your zone> \
+ --workerMachineType=n1-highmem-8 \
+ --stagingLocation=<a gs path for staging>
---suite=SMOKE --manageResources=false --monitorJobs=true
+ --runner=BlockingDataflowRunner \
+ --tempLocation=gs://talend-imejia/nexmark/temp/ \
+ --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
+ --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
-## Direct specific configuration
+### Direct specific configuration
---suite=SMOKE --manageResources=false --monitorJobs=true \
---enforceEncodability=false --enforceImmutability=false
+ --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --enforceEncodability=false --enforceImmutability=false
-## Flink specific configuration
+### Flink specific configuration
---suite=SMOKE --manageResources=false --monitorJobs=true \
---flinkMaster=local
+ --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --flinkMaster=[local] --parallelism=#numcores
-## Spark specific configuration
+### Spark specific configuration
---suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \
--Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
+ --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --sparkMaster=local \
+ -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
# Current Status
@@ -149,19 +169,19 @@ Open issues are tracked [here](https://github.com../../../../../issues):
| Query | Direct | Spark | Flink | Apex |
| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- |
-| 0 | ok | [#1](../../../../../issues/1) | ok | ok |
-| 1 | ok | [#1](../../../../../issues/1) | ok | ok |
-| 2 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 0 | ok | ok | ok | ok |
+| 1 | ok | ok | ok | ok |
+| 2 | ok | ok | ok | ok |
| 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) |
| 4 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 5 | ok | [#3](../../../../../issues/3) | ok | ok |
+| 5 | ok | ok | ok | ok |
| 6 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 7 | ok | [#1](../../../../../issues/1) | ok | [#24](../../../../../issues/24) |
-| 8 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 7 | ok | ok | ok | [#24](../../../../../issues/24) |
+| 8 | ok | ok | ok | ok |
| 9 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 10 | [#5](../../../../../issues/5) | [#4](../../../../../issues/4) | ok | ok |
-| 11 | ok | [#1](../../../../../issues/1) | ok | ok |
-| 12 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 10 | [#5](../../../../../issues/5) | ok | ok | ok |
+| 11 | ok | ok | ok | ok |
+| 12 | ok | ok | ok | ok |
## Streaming / Synthetic / Local
@@ -205,11 +225,11 @@ Batch Mode
-Dexec.classpathScope="test"
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
Streaming Mode
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
## Running on Google Cloud Dataflow
@@ -218,7 +238,7 @@ service.
```
java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+ org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
@@ -251,7 +271,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
```
java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.NexmarkGoogleDriver \
+ org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 0ecc298..7cd7d39 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -61,11 +61,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
@@ -139,7 +134,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <version>2.10</version>
<executions>
<execution>
<goals><goal>analyze-only</goal></goals>
@@ -196,11 +190,13 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
+ <scope>runtime</scope>
</dependency>
<!-- Apex runner -->
@@ -215,12 +211,6 @@
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${apex.codehaus.jackson.version}</version>
@@ -244,6 +234,7 @@
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataflow</artifactId>
<version>${dataflow.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
@@ -289,13 +280,6 @@
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
- <version>${hamcrest.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-library</artifactId>
- <version>${hamcrest.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
index c08cdd3..b012842 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
@@ -34,7 +34,7 @@ import org.joda.time.Instant;
* @param <InputT> Type of input elements.
* @param <OutputT> Type of output elements.
*/
-abstract class AbstractSimulator<InputT, OutputT> {
+public abstract class AbstractSimulator<InputT, OutputT> {
/** Window size for action bucket sampling. */
public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
deleted file mode 100644
index 16c28aa..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Auction.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * An auction submitted by a person.
- */
-public class Auction implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
- @Override
- public void encode(Auction value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
- STRING_CODER.encode(value.description, outStream, Context.NESTED);
- LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
- LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- LONG_CODER.encode(value.expires, outStream, Context.NESTED);
- LONG_CODER.encode(value.seller, outStream, Context.NESTED);
- LONG_CODER.encode(value.category, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
- }
-
- @Override
- public Auction decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String itemName = STRING_CODER.decode(inStream, Context.NESTED);
- String description = STRING_CODER.decode(inStream, Context.NESTED);
- long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
- long reserve = LONG_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- long expires = LONG_CODER.decode(inStream, Context.NESTED);
- long seller = LONG_CODER.decode(inStream, Context.NESTED);
- long category = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
- return new Auction(
- id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
- extra);
- }
- };
-
-
- /** Id of auction. */
- @JsonProperty
- public final long id; // primary key
-
- /** Extra auction properties. */
- @JsonProperty
- public final String itemName;
-
- @JsonProperty
- public final String description;
-
- /** Initial bid price, in cents. */
- @JsonProperty
- public final long initialBid;
-
- /** Reserve price, in cents. */
- @JsonProperty
- public final long reserve;
-
- @JsonProperty
- public final long dateTime;
-
- /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
- @JsonProperty
- public final long expires;
-
- /** Id of person who instigated auction. */
- @JsonProperty
- public final long seller; // foreign key: Person.id
-
- /** Id of category auction is listed under. */
- @JsonProperty
- public final long category; // foreign key: Category.id
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- public final String extra;
-
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Auction() {
- id = 0;
- itemName = null;
- description = null;
- initialBid = 0;
- reserve = 0;
- dateTime = 0;
- expires = 0;
- seller = 0;
- category = 0;
- extra = null;
- }
-
- public Auction(long id, String itemName, String description, long initialBid, long reserve,
- long dateTime, long expires, long seller, long category, String extra) {
- this.id = id;
- this.itemName = itemName;
- this.description = description;
- this.initialBid = initialBid;
- this.reserve = reserve;
- this.dateTime = dateTime;
- this.expires = expires;
- this.seller = seller;
- this.category = category;
- this.extra = extra;
- }
-
- /**
- * Return a copy of auction which capture the given annotation.
- * (Used for debugging).
- */
- public Auction withAnnotation(String annotation) {
- return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
- category, annotation + ": " + extra);
- }
-
- /**
- * Does auction have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from auction. (Used for debugging.)
- */
- public Auction withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
- category, extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
- + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
deleted file mode 100644
index cd52b02..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-
-/**
- * Result of {@link WinningBids} transform.
- */
-public class AuctionBid implements KnownSize, Serializable {
- public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
- @Override
- public void encode(AuctionBid value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- Auction.CODER.encode(value.auction, outStream, Context.NESTED);
- Bid.CODER.encode(value.bid, outStream, Context.NESTED);
- }
-
- @Override
- public AuctionBid decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
- Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
- return new AuctionBid(auction, bid);
- }
- };
-
- @JsonProperty
- public final Auction auction;
-
- @JsonProperty
- public final Bid bid;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionBid() {
- auction = null;
- bid = null;
- }
-
- public AuctionBid(Auction auction, Bid bid) {
- this.auction = auction;
- this.bid = bid;
- }
-
- @Override
- public long sizeInBytes() {
- return auction.sizeInBytes() + bid.sizeInBytes();
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
deleted file mode 100644
index ac1f080..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query5}.
- */
-public class AuctionCount implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
- @Override
- public void encode(AuctionCount value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.count, outStream, Context.NESTED);
- }
-
- @Override
- public AuctionCount decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long count = LONG_CODER.decode(inStream, Context.NESTED);
- return new AuctionCount(auction, count);
- }
- };
-
- @JsonProperty
- public final long auction;
-
- @JsonProperty
- public final long count;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionCount() {
- auction = 0;
- count = 0;
- }
-
- public AuctionCount(long auction, long count) {
- this.auction = auction;
- this.count = count;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
deleted file mode 100644
index 9bdf11c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query2}.
- */
-public class AuctionPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
- @Override
- public void encode(AuctionPrice value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- }
-
- @Override
- public AuctionPrice decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- return new AuctionPrice(auction, price);
- }
- };
-
- @JsonProperty
- public final long auction;
-
- /** Price in cents. */
- @JsonProperty
- public final long price;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionPrice() {
- auction = 0;
- price = 0;
- }
-
- public AuctionPrice(long auction, long price) {
- this.auction = auction;
- this.price = price;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
deleted file mode 100644
index 04fcfdd..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Bid.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Comparator;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A bid for an item on auction.
- */
-public class Bid implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
- @Override
- public void encode(Bid value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
- }
-
- @Override
- public Bid decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long bidder = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
- return new Bid(auction, bidder, price, dateTime, extra);
- }
- };
-
- /**
- * Comparator to order bids by ascending price then descending time
- * (for finding winning bids).
- */
- public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
- @Override
- public int compare(Bid left, Bid right) {
- int i = Double.compare(left.price, right.price);
- if (i != 0) {
- return i;
- }
- return Long.compare(right.dateTime, left.dateTime);
- }
- };
-
- /**
- * Comparator to order bids by ascending time then ascending price.
- * (for finding most recent bids).
- */
- public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
- @Override
- public int compare(Bid left, Bid right) {
- int i = Long.compare(left.dateTime, right.dateTime);
- if (i != 0) {
- return i;
- }
- return Double.compare(left.price, right.price);
- }
- };
-
- /** Id of auction this bid is for. */
- @JsonProperty
- public final long auction; // foreign key: Auction.id
-
- /** Id of person bidding in auction. */
- @JsonProperty
- public final long bidder; // foreign key: Person.id
-
- /** Price of bid, in cents. */
- @JsonProperty
- public final long price;
-
- /**
- * Instant at which bid was made (ms since epoch).
- * NOTE: This may be earlier than the system's event time.
- */
- @JsonProperty
- public final long dateTime;
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- public final String extra;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Bid() {
- auction = 0;
- bidder = 0;
- price = 0;
- dateTime = 0;
- extra = null;
- }
-
- public Bid(long auction, long bidder, long price, long dateTime, String extra) {
- this.auction = auction;
- this.bidder = bidder;
- this.price = price;
- this.dateTime = dateTime;
- this.extra = extra;
- }
-
- /**
- * Return a copy of bid which capture the given annotation.
- * (Used for debugging).
- */
- public Bid withAnnotation(String annotation) {
- return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
- }
-
- /**
- * Does bid have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from bid. (Used for debugging.)
- */
- public Bid withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8 + 8 + 8 + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
deleted file mode 100644
index c6b0fe3..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of query 11.
- */
-public class BidsPerSession implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
- @Override
- public void encode(BidsPerSession value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.personId, outStream, Context.NESTED);
- LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
- }
-
- @Override
- public BidsPerSession decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long personId = LONG_CODER.decode(inStream, Context.NESTED);
- long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
- return new BidsPerSession(personId, bidsPerSession);
- }
- };
-
- @JsonProperty
- public final long personId;
-
- @JsonProperty
- public final long bidsPerSession;
-
- public BidsPerSession() {
- personId = 0;
- bidsPerSession = 0;
- }
-
- public BidsPerSession(long personId, long bidsPerSession) {
- this.personId = personId;
- this.bidsPerSession = bidsPerSession;
- }
-
- @Override
- public long sizeInBytes() {
- // Two longs.
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
deleted file mode 100644
index 7dc1bcc..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A custom, bounded source of event records.
- */
-class BoundedEventSource extends BoundedSource<Event> {
- /** Configuration we generate events against. */
- private final GeneratorConfig config;
-
- /** How many bounded sources to create. */
- private final int numEventGenerators;
-
- public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
- this.config = config;
- this.numEventGenerators = numEventGenerators;
- }
-
- /** A reader to pull events from the generator. */
- private static class EventReader extends BoundedReader<Event> {
- /**
- * Event source we purporting to be reading from.
- * (We can't use Java's capture-outer-class pointer since we must update
- * this field on calls to splitAtFraction.)
- */
- private BoundedEventSource source;
-
- /** Generator we are reading from. */
- private final Generator generator;
-
- private boolean reportedStop;
-
- @Nullable
- private TimestampedValue<Event> currentEvent;
-
- public EventReader(BoundedEventSource source, GeneratorConfig config) {
- this.source = source;
- generator = new Generator(config);
- reportedStop = false;
- }
-
- @Override
- public synchronized boolean start() {
- NexmarkUtils.info("starting bounded generator %s", generator);
- return advance();
- }
-
- @Override
- public synchronized boolean advance() {
- if (!generator.hasNext()) {
- // No more events.
- if (!reportedStop) {
- reportedStop = true;
- NexmarkUtils.info("stopped bounded generator %s", generator);
- }
- return false;
- }
- currentEvent = generator.next();
- return true;
- }
-
- @Override
- public synchronized Event getCurrent() throws NoSuchElementException {
- if (currentEvent == null) {
- throw new NoSuchElementException();
- }
- return currentEvent.getValue();
- }
-
- @Override
- public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
- if (currentEvent == null) {
- throw new NoSuchElementException();
- }
- return currentEvent.getTimestamp();
- }
-
- @Override
- public void close() throws IOException {
- // Nothing to close.
- }
-
- @Override
- public synchronized Double getFractionConsumed() {
- return generator.getFractionConsumed();
- }
-
- @Override
- public synchronized BoundedSource<Event> getCurrentSource() {
- return source;
- }
-
- @Override
- @Nullable
- public synchronized BoundedEventSource splitAtFraction(double fraction) {
- long startId = generator.getCurrentConfig().getStartEventId();
- long stopId = generator.getCurrentConfig().getStopEventId();
- long size = stopId - startId;
- long splitEventId = startId + Math.min((int) (size * fraction), size);
- if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
- // Already passed this position or split results in left or right being empty.
- NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
- return null;
- }
-
- NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
-
- // Scale back the event space of the current generator, and return a generator config
- // representing the event space we just 'stole' from the current generator.
- GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
-
- NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
-
- // At this point
- // generator.events() ++ new Generator(remainingConfig).events()
- // == originalGenerator.events()
-
- // We need a new source to represent the now smaller key space for this reader, so
- // that we can maintain the invariant that
- // this.getCurrentSource().createReader(...)
- // will yield the same output as this.
- source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
-
- // Return a source from which we may read the 'stolen' event space.
- return new BoundedEventSource(remainingConfig, source.numEventGenerators);
- }
- }
-
- @Override
- public List<BoundedEventSource> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) {
- NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
- List<BoundedEventSource> results = new ArrayList<>();
- // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
- for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
- results.add(new BoundedEventSource(subConfig, 1));
- }
- return results;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) {
- return config.getEstimatedSizeBytes();
- }
-
- @Override
- public EventReader createReader(PipelineOptions options) {
- NexmarkUtils.info("creating initial bounded reader for %s", config);
- return new EventReader(this, config);
- }
-
- @Override
- public void validate() {
- // Nothing to validate.
- }
-
- @Override
- public Coder<Event> getDefaultOutputCoder() {
- return Event.CODER;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
deleted file mode 100644
index c83fb17..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query4}.
- */
-public class CategoryPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
- public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
- @Override
- public void encode(CategoryPrice value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.category, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
- }
-
- @Override
- public CategoryPrice decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long category = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- boolean isLast = INT_CODER.decode(inStream, context) != 0;
- return new CategoryPrice(category, price, isLast);
- }
- };
-
- @JsonProperty
- public final long category;
-
- /** Price in cents. */
- @JsonProperty
- public final long price;
-
- @JsonProperty
- public final boolean isLast;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private CategoryPrice() {
- category = 0;
- price = 0;
- isLast = false;
- }
-
- public CategoryPrice(long category, long price, boolean isLast) {
- this.category = category;
- this.price = price;
- this.isLast = isLast;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8 + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
deleted file mode 100644
index 3a045f9..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Done.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * Result of query 10.
- */
-public class Done implements KnownSize, Serializable {
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Done> CODER = new AtomicCoder<Done>() {
- @Override
- public void encode(Done value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- STRING_CODER.encode(value.message, outStream, Context.NESTED);
- }
-
- @Override
- public Done decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- String message = STRING_CODER.decode(inStream, Context.NESTED);
- return new Done(message);
- }
- };
-
- @JsonProperty
- public final String message;
-
- // For Avro only.
- @SuppressWarnings("unused")
- public Done() {
- message = null;
- }
-
- public Done(String message) {
- this.message = message;
- }
-
- @Override
- public long sizeInBytes() {
- return message.length();
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
deleted file mode 100644
index 769cedd..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Event.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarIntCoder;
-
-/**
- * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
- * or a {@link Bid}.
- */
-public class Event implements KnownSize, Serializable {
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
- public static final Coder<Event> CODER = new AtomicCoder<Event>() {
- @Override
- public void encode(Event value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value.newPerson != null) {
- INT_CODER.encode(0, outStream, Context.NESTED);
- Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
- } else if (value.newAuction != null) {
- INT_CODER.encode(1, outStream, Context.NESTED);
- Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
- } else if (value.bid != null) {
- INT_CODER.encode(2, outStream, Context.NESTED);
- Bid.CODER.encode(value.bid, outStream, Context.NESTED);
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-
- @Override
- public Event decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- int tag = INT_CODER.decode(inStream, context);
- if (tag == 0) {
- Person person = Person.CODER.decode(inStream, Context.NESTED);
- return new Event(person);
- } else if (tag == 1) {
- Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
- return new Event(auction);
- } else if (tag == 2) {
- Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
- return new Event(bid);
- } else {
- throw new RuntimeException("invalid event encoding");
- }
- }
- };
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Person newPerson;
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Auction newAuction;
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Bid bid;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Event() {
- newPerson = null;
- newAuction = null;
- bid = null;
- }
-
- public Event(Person newPerson) {
- this.newPerson = newPerson;
- newAuction = null;
- bid = null;
- }
-
- public Event(Auction newAuction) {
- newPerson = null;
- this.newAuction = newAuction;
- bid = null;
- }
-
- public Event(Bid bid) {
- newPerson = null;
- newAuction = null;
- this.bid = bid;
- }
-
- /**
- * Return a copy of event which captures {@code annotation}.
- * (Used for debugging).
- */
- public Event withAnnotation(String annotation) {
- if (newPerson != null) {
- return new Event(newPerson.withAnnotation(annotation));
- } else if (newAuction != null) {
- return new Event(newAuction.withAnnotation(annotation));
- } else {
- return new Event(bid.withAnnotation(annotation));
- }
- }
-
- /**
- * Does event have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- if (newPerson != null) {
- return newPerson.hasAnnotation(annotation);
- } else if (newAuction != null) {
- return newAuction.hasAnnotation(annotation);
- } else {
- return bid.hasAnnotation(annotation);
- }
- }
-
- /**
- * Remove {@code annotation} from event. (Used for debugging.)
- */
- public Event withoutAnnotation(String annotation) {
- if (newPerson != null) {
- return new Event(newPerson.withoutAnnotation(annotation));
- } else if (newAuction != null) {
- return new Event(newAuction.withoutAnnotation(annotation));
- } else {
- return new Event(bid.withoutAnnotation(annotation));
- }
- }
-
- @Override
- public long sizeInBytes() {
- if (newPerson != null) {
- return 1 + newPerson.sizeInBytes();
- } else if (newAuction != null) {
- return 1 + newAuction.sizeInBytes();
- } else if (bid != null) {
- return 1 + bid.sizeInBytes();
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-
- @Override
- public String toString() {
- if (newPerson != null) {
- return newPerson.toString();
- } else if (newAuction != null) {
- return newAuction.toString();
- } else if (bid != null) {
- return bid.toString();
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-}