You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:58 UTC
[50/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
deleted file mode 100644
index 7926690..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ /dev/null
@@ -1,672 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.AuctionCount;
-import org.apache.beam.integration.nexmark.model.AuctionPrice;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.BidsPerSession;
-import org.apache.beam.integration.nexmark.model.CategoryPrice;
-import org.apache.beam.integration.nexmark.model.Done;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.IdNameReserve;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.NameCityStateId;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.integration.nexmark.model.SellerPrice;
-import org.apache.beam.integration.nexmark.sources.BoundedEventSource;
-import org.apache.beam.integration.nexmark.sources.Generator;
-import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.integration.nexmark.sources.UnboundedEventSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Odd's 'n Ends used throughout queries and driver.
- */
-public class NexmarkUtils {
- private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class);
-
- /**
- * Mapper for (de)serializing JSON.
- */
- public static final ObjectMapper MAPPER = new ObjectMapper();
-
- /**
- * Possible sources for events.
- */
- public enum SourceType {
- /**
- * Produce events directly.
- */
- DIRECT,
- /**
- * Read events from an Avro file.
- */
- AVRO,
- /**
- * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
- */
- PUBSUB
- }
-
- /**
- * Possible sinks for query results.
- */
- public enum SinkType {
- /**
- * Discard all results.
- */
- COUNT_ONLY,
- /**
- * Discard all results after converting them to strings.
- */
- DEVNULL,
- /**
- * Write to a PubSub topic. It will be drained by this pipeline.
- */
- PUBSUB,
- /**
- * Write to a text file. Only works in batch mode.
- */
- TEXT,
- /**
- * Write raw Events to Avro. Only works in batch mode.
- */
- AVRO,
- /**
- * Write raw Events to BigQuery.
- */
- BIGQUERY,
- }
-
- /**
- * Pub/sub mode to run in.
- */
- public enum PubSubMode {
- /**
- * Publish events to pub/sub, but don't run the query.
- */
- PUBLISH_ONLY,
- /**
- * Consume events from pub/sub and run the query, but don't publish.
- */
- SUBSCRIBE_ONLY,
- /**
- * Both publish and consume, but as separate jobs.
- */
- COMBINED
- }
-
- /**
- * Coder strategies.
- */
- public enum CoderStrategy {
- /**
- * Hand-written.
- */
- HAND,
- /**
- * Avro.
- */
- AVRO,
- /**
- * Java serialization.
- */
- JAVA
- }
-
- /**
- * How to determine resource names.
- */
- public enum ResourceNameMode {
- /** Names are used as provided. */
- VERBATIM,
- /** Names are suffixed with the query being run. */
- QUERY,
- /** Names are suffixed with the query being run and a random number. */
- QUERY_AND_SALT
- }
-
- /**
- * Units for rates.
- */
- public enum RateUnit {
- PER_SECOND(1_000_000L),
- PER_MINUTE(60_000_000L);
-
- RateUnit(long usPerUnit) {
- this.usPerUnit = usPerUnit;
- }
-
- /**
- * Number of microseconds per unit.
- */
- private final long usPerUnit;
-
- /**
- * Number of microseconds between events at given rate.
- */
- public long rateToPeriodUs(long rate) {
- return (usPerUnit + rate / 2) / rate;
- }
- }
-
- /**
- * Shape of event rate.
- */
- public enum RateShape {
- SQUARE,
- SINE;
-
- /**
- * Number of steps used to approximate sine wave.
- */
- private static final int N = 10;
-
- /**
- * Return inter-event delay, in microseconds, for each generator
- * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}.
- */
- public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
- return unit.rateToPeriodUs(rate) * numGenerators;
- }
-
- /**
- * Return array of successive inter-event delays, in microseconds, for each generator
- * to follow in order to achieve this shape with {@code firstRate/nextRate} at
- * {@code unit} using {@code numGenerators}.
- */
- public long[] interEventDelayUs(
- int firstRate, int nextRate, RateUnit unit, int numGenerators) {
- if (firstRate == nextRate) {
- long[] interEventDelayUs = new long[1];
- interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
- return interEventDelayUs;
- }
-
- switch (this) {
- case SQUARE: {
- long[] interEventDelayUs = new long[2];
- interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
- interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators;
- return interEventDelayUs;
- }
- case SINE: {
- double mid = (firstRate + nextRate) / 2.0;
- double amp = (firstRate - nextRate) / 2.0; // may be -ve
- long[] interEventDelayUs = new long[N];
- for (int i = 0; i < N; i++) {
- double r = (2.0 * Math.PI * i) / N;
- double rate = mid + amp * Math.cos(r);
- interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators;
- }
- return interEventDelayUs;
- }
- }
- throw new RuntimeException(); // switch should be exhaustive
- }
-
- /**
- * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so
- * as to cycle through the entire sequence every {@code ratePeriodSec}.
- */
- public int stepLengthSec(int ratePeriodSec) {
- int n = 0;
- switch (this) {
- case SQUARE:
- n = 2;
- break;
- case SINE:
- n = N;
- break;
- }
- return (ratePeriodSec + n - 1) / n;
- }
- }
-
- /**
- * Set to true to capture all info messages. The logging level flags don't currently work.
- */
- private static final boolean LOG_INFO = false;
-
- /**
- * Set to true to capture all error messages. The logging level flags don't currently work.
- */
- private static final boolean LOG_ERROR = true;
-
- /**
- * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results
- * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
- */
- private static final boolean LOG_TO_CONSOLE = false;
-
- /**
- * Log info message.
- */
- public static void info(String format, Object... args) {
- if (LOG_INFO) {
- LOG.info(String.format(format, args));
- if (LOG_TO_CONSOLE) {
- System.out.println(String.format(format, args));
- }
- }
- }
-
- /**
- * Log message to console. For client side only.
- */
- public static void console(String format, Object... args) {
- System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
- }
-
- /**
- * Label to use for timestamps on pub/sub messages.
- */
- public static final String PUBSUB_TIMESTAMP = "timestamp";
-
- /**
- * Label to use for windmill ids on pub/sub messages.
- */
- public static final String PUBSUB_ID = "id";
-
- /**
- * All events will be given a timestamp relative to this time (ms since epoch).
- */
- private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
-
- /**
- * Instants guaranteed to be strictly before and after all event timestamps, and which won't
- * be subject to underflow/overflow.
- */
- public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365));
- public static final Instant END_OF_TIME =
- BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365));
-
- /**
- * Setup pipeline with codes and some other options.
- */
- public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
- CoderRegistry registry = p.getCoderRegistry();
- switch (coderStrategy) {
- case HAND:
- registry.registerCoderForClass(Auction.class, Auction.CODER);
- registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
- registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
- registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
- registry.registerCoderForClass(Bid.class, Bid.CODER);
- registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
- registry.registerCoderForClass(Event.class, Event.CODER);
- registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
- registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
- registry.registerCoderForClass(Person.class, Person.CODER);
- registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
- registry.registerCoderForClass(Done.class, Done.CODER);
- registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
- break;
- case AVRO:
- registry.registerCoderProvider(AvroCoder.getCoderProvider());
- break;
- case JAVA:
- registry.registerCoderProvider(SerializableCoder.getCoderProvider());
- break;
- }
- }
-
- /**
- * Return a generator config to match the given {@code options}.
- */
- private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
- return new GeneratorConfig(configuration,
- configuration.useWallclockEventTime ? System.currentTimeMillis()
- : BASE_TIME, 0,
- configuration.numEvents, 0);
- }
-
- /**
- * Return an iterator of events using the 'standard' generator config.
- */
- public static Iterator<TimestampedValue<Event>> standardEventIterator(
- NexmarkConfiguration configuration) {
- return new Generator(standardGeneratorConfig(configuration));
- }
-
- /**
- * Return a transform which yields a finite number of synthesized events generated
- * as a batch.
- */
- public static PTransform<PBegin, PCollection<Event>> batchEventsSource(
- NexmarkConfiguration configuration) {
- return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration),
- configuration.numEventGenerators));
- }
-
- /**
- * Return a transform which yields a finite number of synthesized events generated
- * on-the-fly in real time.
- */
- public static PTransform<PBegin, PCollection<Event>> streamEventsSource(
- NexmarkConfiguration configuration) {
- return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration),
- configuration.numEventGenerators,
- configuration.watermarkHoldbackSec,
- configuration.isRateLimited));
- }
-
- /**
- * Return a transform to pass-through events, but count them as they go by.
- */
- public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
- return ParDo.of(new DoFn<Event, Event>() {
- final Counter eventCounter = Metrics.counter(name, "events");
- final Counter newPersonCounter = Metrics.counter(name, "newPersons");
- final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
- final Counter bidCounter = Metrics.counter(name, "bids");
- final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- eventCounter.inc();
- if (c.element().newPerson != null) {
- newPersonCounter.inc();
- } else if (c.element().newAuction != null) {
- newAuctionCounter.inc();
- } else if (c.element().bid != null) {
- bidCounter.inc();
- } else {
- endOfStreamCounter.inc();
- }
- info("%s snooping element %s", name, c.element());
- c.output(c.element());
- }
- });
- }
-
- /**
- * Return a transform to count and discard each element.
- */
- public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
- return ParDo.of(new DoFn<T, Void>() {
- final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- discardedCounterMetric.inc();
- }
- });
- }
-
- /**
- * Return a transform to log each element, passing it through unchanged.
- */
- public static <T> ParDo.SingleOutput<T, T> log(final String name) {
- return ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- LOG.info("%s: %s", name, c.element());
- c.output(c.element());
- }
- });
- }
-
- /**
- * Return a transform to format each element as a string.
- */
- public static <T> ParDo.SingleOutput<T, String> format(final String name) {
- return ParDo.of(new DoFn<T, String>() {
- final Counter recordCounterMetric = Metrics.counter(name, "records");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- recordCounterMetric.inc();
- c.output(c.element().toString());
- }
- });
- }
-
- /**
- * Return a transform to make explicit the timestamp of each element.
- */
- public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
- return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(TimestampedValue.of(c.element(), c.timestamp()));
- }
- });
- }
-
- /**
- * Return a transform to reduce a stream to a single, order-invariant long hash.
- */
- public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(
- final long numEvents, String name) {
- return new PTransform<PCollection<T>, PCollection<Long>>(name) {
- @Override
- public PCollection<Long> expand(PCollection<T> input) {
- return input.apply(Window.<T>into(new GlobalWindows())
- .triggering(AfterPane.elementCountAtLeast((int) numEvents))
- .withAllowedLateness(Duration.standardDays(1))
- .discardingFiredPanes())
-
- .apply(name + ".Hash", ParDo.of(new DoFn<T, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long hash =
- Hashing.murmur3_128()
- .newHasher()
- .putLong(c.timestamp().getMillis())
- .putString(c.element().toString(), StandardCharsets.UTF_8)
- .hash()
- .asLong();
- c.output(hash);
- }
- }))
-
- .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() {
- @Override
- public Long apply(Long left, Long right) {
- return left ^ right;
- }
- }));
- }
- };
- }
-
- private static final long MASK = (1L << 16) - 1L;
- private static final long HASH = 0x243F6A8885A308D3L;
- private static final long INIT_PLAINTEXT = 50000L;
-
- /**
- * Return a transform to keep the CPU busy for given milliseconds on every record.
- */
- public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
- return ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long now = System.currentTimeMillis();
- long end = now + delayMs;
- while (now < end) {
- // Find plaintext which hashes to HASH in lowest MASK bits.
- // Values chosen to roughly take 1ms on typical workstation.
- long p = INIT_PLAINTEXT;
- while (true) {
- long t = Hashing.murmur3_128().hashLong(p).asLong();
- if ((t & MASK) == (HASH & MASK)) {
- break;
- }
- p++;
- }
- now = System.currentTimeMillis();
- }
- c.output(c.element());
- }
- });
- }
-
- private static final int MAX_BUFFER_SIZE = 1 << 24;
-
- private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{
-
- private long bytes;
-
- private DiskBusyTransform(long bytes) {
- this.bytes = bytes;
- }
-
- @Override public PCollection<T> expand(PCollection<T> input) {
- // Add dummy key to be able to use State API
- PCollection<KV<Integer, T>> kvCollection = input.apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() {
-
- @ProcessElement public void processElement(ProcessContext context) {
- context.output(KV.of(0, context.element()));
- }
- }));
- // Apply actual transform that generates disk IO using state API
- PCollection<T> output = kvCollection.apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() {
-
- private static final String DISK_BUSY = "diskBusy";
-
- @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs
- .value(ByteArrayCoder.of());
-
- @ProcessElement public void processElement(ProcessContext c,
- @StateId(DISK_BUSY) ValueState<byte[]> state) {
- long remain = bytes;
- long now = System.currentTimeMillis();
- while (remain > 0) {
- long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
- remain -= thisBytes;
- byte[] arr = new byte[(int) thisBytes];
- for (int i = 0; i < thisBytes; i++) {
- arr[i] = (byte) now;
- }
- state.write(arr);
- now = System.currentTimeMillis();
- }
- c.output(c.element().getValue());
- }
- }));
- return output;
- }
- }
-
-
- /**
- * Return a transform to write given number of bytes to durable store on every record.
- */
- public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) {
- return new DiskBusyTransform<>(bytes);
- }
-
- /**
- * Return a transform to cast each element to {@link KnownSize}.
- */
- private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
- return ParDo.of(new DoFn<T, KnownSize>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- });
- }
-
- /**
- * A coder for instances of {@code T} cast up to {@link KnownSize}.
- *
- * @param <T> True type of object.
- */
- private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> {
- private final Coder<T> trueCoder;
-
- public CastingCoder(Coder<T> trueCoder) {
- this.trueCoder = trueCoder;
- }
-
- @Override
- public void encode(KnownSize value, OutputStream outStream)
- throws CoderException, IOException {
- @SuppressWarnings("unchecked")
- T typedValue = (T) value;
- trueCoder.encode(typedValue, outStream);
- }
-
- @Override
- public KnownSize decode(InputStream inStream)
- throws CoderException, IOException {
- return trueCoder.decode(inStream);
- }
- }
-
- /**
- * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}.
- */
- private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
- return new CastingCoder<>(trueCoder);
- }
-
- /**
- * Return {@code elements} as {@code KnownSize}s.
- */
- public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(
- final String name, PCollection<T> elements) {
- return elements.apply(name + ".Forget", castToKnownSize())
- .setCoder(makeCastingCoder(elements.getCoder()));
- }
-
- // Do not instantiate.
- private NexmarkUtils() {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
deleted file mode 100644
index 9f5d7c0..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * An auction submitted by a person.
- */
-public class Auction implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
- @Override
- public void encode(Auction value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream);
- STRING_CODER.encode(value.itemName, outStream);
- STRING_CODER.encode(value.description, outStream);
- LONG_CODER.encode(value.initialBid, outStream);
- LONG_CODER.encode(value.reserve, outStream);
- LONG_CODER.encode(value.dateTime, outStream);
- LONG_CODER.encode(value.expires, outStream);
- LONG_CODER.encode(value.seller, outStream);
- LONG_CODER.encode(value.category, outStream);
- STRING_CODER.encode(value.extra, outStream);
- }
-
- @Override
- public Auction decode(
- InputStream inStream)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream);
- String itemName = STRING_CODER.decode(inStream);
- String description = STRING_CODER.decode(inStream);
- long initialBid = LONG_CODER.decode(inStream);
- long reserve = LONG_CODER.decode(inStream);
- long dateTime = LONG_CODER.decode(inStream);
- long expires = LONG_CODER.decode(inStream);
- long seller = LONG_CODER.decode(inStream);
- long category = LONG_CODER.decode(inStream);
- String extra = STRING_CODER.decode(inStream);
- return new Auction(
- id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
- extra);
- }
- };
-
-
- /** Id of auction. */
- @JsonProperty
- public final long id; // primary key
-
- /** Extra auction properties. */
- @JsonProperty
- private final String itemName;
-
- @JsonProperty
- private final String description;
-
- /** Initial bid price, in cents. */
- @JsonProperty
- private final long initialBid;
-
- /** Reserve price, in cents. */
- @JsonProperty
- public final long reserve;
-
- @JsonProperty
- public final long dateTime;
-
- /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
- @JsonProperty
- public final long expires;
-
- /** Id of person who instigated auction. */
- @JsonProperty
- public final long seller; // foreign key: Person.id
-
- /** Id of category auction is listed under. */
- @JsonProperty
- public final long category; // foreign key: Category.id
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- private final String extra;
-
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Auction() {
- id = 0;
- itemName = null;
- description = null;
- initialBid = 0;
- reserve = 0;
- dateTime = 0;
- expires = 0;
- seller = 0;
- category = 0;
- extra = null;
- }
-
- public Auction(long id, String itemName, String description, long initialBid, long reserve,
- long dateTime, long expires, long seller, long category, String extra) {
- this.id = id;
- this.itemName = itemName;
- this.description = description;
- this.initialBid = initialBid;
- this.reserve = reserve;
- this.dateTime = dateTime;
- this.expires = expires;
- this.seller = seller;
- this.category = category;
- this.extra = extra;
- }
-
- /**
- * Return a copy of auction which capture the given annotation.
- * (Used for debugging).
- */
- public Auction withAnnotation(String annotation) {
- return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
- category, annotation + ": " + extra);
- }
-
- /**
- * Does auction have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from auction. (Used for debugging.)
- */
- public Auction withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
- category, extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
- + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
deleted file mode 100644
index b9d79db..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-
-/**
- * Result of {@link org.apache.beam.integration.nexmark.queries.WinningBids} transform.
- */
-public class AuctionBid implements KnownSize, Serializable {
- public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
- @Override
- public void encode(AuctionBid value, OutputStream outStream)
- throws CoderException, IOException {
- Auction.CODER.encode(value.auction, outStream);
- Bid.CODER.encode(value.bid, outStream);
- }
-
- @Override
- public AuctionBid decode(
- InputStream inStream)
- throws CoderException, IOException {
- Auction auction = Auction.CODER.decode(inStream);
- Bid bid = Bid.CODER.decode(inStream);
- return new AuctionBid(auction, bid);
- }
- };
-
- @JsonProperty
- public final Auction auction;
-
- @JsonProperty
- public final Bid bid;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionBid() {
- auction = null;
- bid = null;
- }
-
- public AuctionBid(Auction auction, Bid bid) {
- this.auction = auction;
- this.bid = bid;
- }
-
- @Override
- public long sizeInBytes() {
- return auction.sizeInBytes() + bid.sizeInBytes();
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
deleted file mode 100644
index 0e643ff..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of Query5.
- */
-public class AuctionCount implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
- @Override
- public void encode(AuctionCount value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream);
- LONG_CODER.encode(value.count, outStream);
- }
-
- @Override
- public AuctionCount decode(InputStream inStream)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream);
- long count = LONG_CODER.decode(inStream);
- return new AuctionCount(auction, count);
- }
- };
-
- @JsonProperty private final long auction;
-
- @JsonProperty private final long count;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionCount() {
- auction = 0;
- count = 0;
- }
-
- public AuctionCount(long auction, long count) {
- this.auction = auction;
- this.count = count;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
deleted file mode 100644
index 7d51a21..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of Query2.
- */
-public class AuctionPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
- @Override
- public void encode(AuctionPrice value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream);
- LONG_CODER.encode(value.price, outStream);
- }
-
- @Override
- public AuctionPrice decode(
- InputStream inStream)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream);
- long price = LONG_CODER.decode(inStream);
- return new AuctionPrice(auction, price);
- }
- };
-
- @JsonProperty
- private final long auction;
-
- /** Price in cents. */
- @JsonProperty
- private final long price;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private AuctionPrice() {
- auction = 0;
- price = 0;
- }
-
- public AuctionPrice(long auction, long price) {
- this.auction = auction;
- this.price = price;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
deleted file mode 100644
index 4fa9ea0..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Comparator;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A bid for an item on auction.
- */
-public class Bid implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
- @Override
- public void encode(Bid value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream);
- LONG_CODER.encode(value.bidder, outStream);
- LONG_CODER.encode(value.price, outStream);
- LONG_CODER.encode(value.dateTime, outStream);
- STRING_CODER.encode(value.extra, outStream);
- }
-
- @Override
- public Bid decode(
- InputStream inStream)
- throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream);
- long bidder = LONG_CODER.decode(inStream);
- long price = LONG_CODER.decode(inStream);
- long dateTime = LONG_CODER.decode(inStream);
- String extra = STRING_CODER.decode(inStream);
- return new Bid(auction, bidder, price, dateTime, extra);
- }
-
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- /**
- * Comparator to order bids by ascending price then descending time
- * (for finding winning bids).
- */
- public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
- @Override
- public int compare(Bid left, Bid right) {
- int i = Double.compare(left.price, right.price);
- if (i != 0) {
- return i;
- }
- return Long.compare(right.dateTime, left.dateTime);
- }
- };
-
- /**
- * Comparator to order bids by ascending time then ascending price.
- * (for finding most recent bids).
- */
- public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
- @Override
- public int compare(Bid left, Bid right) {
- int i = Long.compare(left.dateTime, right.dateTime);
- if (i != 0) {
- return i;
- }
- return Double.compare(left.price, right.price);
- }
- };
-
- /** Id of auction this bid is for. */
- @JsonProperty
- public final long auction; // foreign key: Auction.id
-
- /** Id of person bidding in auction. */
- @JsonProperty
- public final long bidder; // foreign key: Person.id
-
- /** Price of bid, in cents. */
- @JsonProperty
- public final long price;
-
- /**
- * Instant at which bid was made (ms since epoch).
- * NOTE: This may be earlier than the system's event time.
- */
- @JsonProperty
- public final long dateTime;
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- public final String extra;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Bid() {
- auction = 0;
- bidder = 0;
- price = 0;
- dateTime = 0;
- extra = null;
- }
-
- public Bid(long auction, long bidder, long price, long dateTime, String extra) {
- this.auction = auction;
- this.bidder = bidder;
- this.price = price;
- this.dateTime = dateTime;
- this.extra = extra;
- }
-
- /**
- * Return a copy of bid which capture the given annotation.
- * (Used for debugging).
- */
- public Bid withAnnotation(String annotation) {
- return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
- }
-
- /**
- * Does bid have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from bid. (Used for debugging.)
- */
- public Bid withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8 + 8 + 8 + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
deleted file mode 100644
index 3211456..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of query 11.
- */
-public class BidsPerSession implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
- @Override
- public void encode(BidsPerSession value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.personId, outStream);
- LONG_CODER.encode(value.bidsPerSession, outStream);
- }
-
- @Override
- public BidsPerSession decode(
- InputStream inStream)
- throws CoderException, IOException {
- long personId = LONG_CODER.decode(inStream);
- long bidsPerSession = LONG_CODER.decode(inStream);
- return new BidsPerSession(personId, bidsPerSession);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- private final long personId;
-
- @JsonProperty
- private final long bidsPerSession;
-
- public BidsPerSession() {
- personId = 0;
- bidsPerSession = 0;
- }
-
- public BidsPerSession(long personId, long bidsPerSession) {
- this.personId = personId;
- this.bidsPerSession = bidsPerSession;
- }
-
- @Override
- public long sizeInBytes() {
- // Two longs.
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
deleted file mode 100644
index 2678198..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of Query4.
- */
-public class CategoryPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
- public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
- @Override
- public void encode(CategoryPrice value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.category, outStream);
- LONG_CODER.encode(value.price, outStream);
- INT_CODER.encode(value.isLast ? 1 : 0, outStream);
- }
-
- @Override
- public CategoryPrice decode(InputStream inStream)
- throws CoderException, IOException {
- long category = LONG_CODER.decode(inStream);
- long price = LONG_CODER.decode(inStream);
- boolean isLast = INT_CODER.decode(inStream) != 0;
- return new CategoryPrice(category, price, isLast);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- public final long category;
-
- /** Price in cents. */
- @JsonProperty
- public final long price;
-
- @JsonProperty
- public final boolean isLast;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private CategoryPrice() {
- category = 0;
- price = 0;
- isLast = false;
- }
-
- public CategoryPrice(long category, long price, boolean isLast) {
- this.category = category;
- this.price = price;
- this.isLast = isLast;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8 + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
deleted file mode 100644
index b0a88d4..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * Result of query 10.
- */
-public class Done implements KnownSize, Serializable {
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<Done> CODER = new CustomCoder<Done>() {
- @Override
- public void encode(Done value, OutputStream outStream)
- throws CoderException, IOException {
- STRING_CODER.encode(value.message, outStream);
- }
-
- @Override
- public Done decode(InputStream inStream)
- throws CoderException, IOException {
- String message = STRING_CODER.decode(inStream);
- return new Done(message);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- private final String message;
-
- // For Avro only.
- @SuppressWarnings("unused")
- public Done() {
- message = null;
- }
-
- public Done(String message) {
- this.message = message;
- }
-
- @Override
- public long sizeInBytes() {
- return message.length();
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
deleted file mode 100644
index 0e1672e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-
-/**
- * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
- * {@link Bid}.
- */
-public class Event implements KnownSize, Serializable {
- private enum Tag {
- PERSON(0),
- AUCTION(1),
- BID(2);
-
- private int value = -1;
-
- Tag(int value){
- this.value = value;
- }
- }
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
- public static final Coder<Event> CODER =
- new CustomCoder<Event>() {
- @Override
- public void encode(Event value, OutputStream outStream) throws IOException {
- if (value.newPerson != null) {
- INT_CODER.encode(Tag.PERSON.value, outStream);
- Person.CODER.encode(value.newPerson, outStream);
- } else if (value.newAuction != null) {
- INT_CODER.encode(Tag.AUCTION.value, outStream);
- Auction.CODER.encode(value.newAuction, outStream);
- } else if (value.bid != null) {
- INT_CODER.encode(Tag.BID.value, outStream);
- Bid.CODER.encode(value.bid, outStream);
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-
- @Override
- public Event decode(InputStream inStream) throws IOException {
- int tag = INT_CODER.decode(inStream);
- if (tag == Tag.PERSON.value) {
- Person person = Person.CODER.decode(inStream);
- return new Event(person);
- } else if (tag == Tag.AUCTION.value) {
- Auction auction = Auction.CODER.decode(inStream);
- return new Event(auction);
- } else if (tag == Tag.BID.value) {
- Bid bid = Bid.CODER.decode(inStream);
- return new Event(bid);
- } else {
- throw new RuntimeException("invalid event encoding");
- }
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Person newPerson;
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Auction newAuction;
-
- @Nullable
- @org.apache.avro.reflect.Nullable
- public final Bid bid;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Event() {
- newPerson = null;
- newAuction = null;
- bid = null;
- }
-
- public Event(Person newPerson) {
- this.newPerson = newPerson;
- newAuction = null;
- bid = null;
- }
-
- public Event(Auction newAuction) {
- newPerson = null;
- this.newAuction = newAuction;
- bid = null;
- }
-
- public Event(Bid bid) {
- newPerson = null;
- newAuction = null;
- this.bid = bid;
- }
-
- /** Return a copy of event which captures {@code annotation}. (Used for debugging). */
- public Event withAnnotation(String annotation) {
- if (newPerson != null) {
- return new Event(newPerson.withAnnotation(annotation));
- } else if (newAuction != null) {
- return new Event(newAuction.withAnnotation(annotation));
- } else {
- return new Event(bid.withAnnotation(annotation));
- }
- }
-
- /** Does event have {@code annotation}? (Used for debugging.) */
- public boolean hasAnnotation(String annotation) {
- if (newPerson != null) {
- return newPerson.hasAnnotation(annotation);
- } else if (newAuction != null) {
- return newAuction.hasAnnotation(annotation);
- } else {
- return bid.hasAnnotation(annotation);
- }
- }
-
- @Override
- public long sizeInBytes() {
- if (newPerson != null) {
- return 1 + newPerson.sizeInBytes();
- } else if (newAuction != null) {
- return 1 + newAuction.sizeInBytes();
- } else if (bid != null) {
- return 1 + bid.sizeInBytes();
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-
- @Override
- public String toString() {
- if (newPerson != null) {
- return newPerson.toString();
- } else if (newAuction != null) {
- return newAuction.toString();
- } else if (bid != null) {
- return bid.toString();
- } else {
- throw new RuntimeException("invalid event");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
deleted file mode 100644
index 8cade4e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result type of Query8.
- */
-public class IdNameReserve implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
- @Override
- public void encode(IdNameReserve value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream);
- STRING_CODER.encode(value.name, outStream);
- LONG_CODER.encode(value.reserve, outStream);
- }
-
- @Override
- public IdNameReserve decode(
- InputStream inStream)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream);
- String name = STRING_CODER.decode(inStream);
- long reserve = LONG_CODER.decode(inStream);
- return new IdNameReserve(id, name, reserve);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- private final long id;
-
- @JsonProperty
- private final String name;
-
- /** Reserve price in cents. */
- @JsonProperty
- private final long reserve;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private IdNameReserve() {
- id = 0;
- name = null;
- reserve = 0;
- }
-
- public IdNameReserve(long id, String name, long reserve) {
- this.id = id;
- this.name = name;
- this.reserve = reserve;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + name.length() + 1 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
deleted file mode 100644
index c742eac..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-/**
- * Interface for elements which can quickly estimate their encoded byte size.
- */
-public interface KnownSize {
- long sizeInBytes();
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
deleted file mode 100644
index 37bd3c6..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of Query3.
- */
-public class NameCityStateId implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
- @Override
- public void encode(NameCityStateId value, OutputStream outStream)
- throws CoderException, IOException {
- STRING_CODER.encode(value.name, outStream);
- STRING_CODER.encode(value.city, outStream);
- STRING_CODER.encode(value.state, outStream);
- LONG_CODER.encode(value.id, outStream);
- }
-
- @Override
- public NameCityStateId decode(InputStream inStream)
- throws CoderException, IOException {
- String name = STRING_CODER.decode(inStream);
- String city = STRING_CODER.decode(inStream);
- String state = STRING_CODER.decode(inStream);
- long id = LONG_CODER.decode(inStream);
- return new NameCityStateId(name, city, state, id);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- private final String name;
-
- @JsonProperty
- private final String city;
-
- @JsonProperty
- private final String state;
-
- @JsonProperty
- private final long id;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private NameCityStateId() {
- name = null;
- city = null;
- state = null;
- id = 0;
- }
-
- public NameCityStateId(String name, String city, String state, long id) {
- this.name = name;
- this.city = city;
- this.state = state;
- this.id = id;
- }
-
- @Override
- public long sizeInBytes() {
- return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
deleted file mode 100644
index bde587d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A person either creating an auction or making a bid.
- */
-public class Person implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Person> CODER = new CustomCoder<Person>() {
- @Override
- public void encode(Person value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream);
- STRING_CODER.encode(value.name, outStream);
- STRING_CODER.encode(value.emailAddress, outStream);
- STRING_CODER.encode(value.creditCard, outStream);
- STRING_CODER.encode(value.city, outStream);
- STRING_CODER.encode(value.state, outStream);
- LONG_CODER.encode(value.dateTime, outStream);
- STRING_CODER.encode(value.extra, outStream);
- }
-
- @Override
- public Person decode(InputStream inStream)
- throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream);
- String name = STRING_CODER.decode(inStream);
- String emailAddress = STRING_CODER.decode(inStream);
- String creditCard = STRING_CODER.decode(inStream);
- String city = STRING_CODER.decode(inStream);
- String state = STRING_CODER.decode(inStream);
- long dateTime = LONG_CODER.decode(inStream);
- String extra = STRING_CODER.decode(inStream);
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- /** Id of person. */
- @JsonProperty
- public final long id; // primary key
-
- /** Extra person properties. */
- @JsonProperty
- public final String name;
-
- @JsonProperty
- private final String emailAddress;
-
- @JsonProperty
- private final String creditCard;
-
- @JsonProperty
- public final String city;
-
- @JsonProperty
- public final String state;
-
- @JsonProperty
- public final long dateTime;
-
- /** Additional arbitrary payload for performance testing. */
- @JsonProperty
- private final String extra;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private Person() {
- id = 0;
- name = null;
- emailAddress = null;
- creditCard = null;
- city = null;
- state = null;
- dateTime = 0;
- extra = null;
- }
-
- public Person(long id, String name, String emailAddress, String creditCard, String city,
- String state, long dateTime, String extra) {
- this.id = id;
- this.name = name;
- this.emailAddress = emailAddress;
- this.creditCard = creditCard;
- this.city = city;
- this.state = state;
- this.dateTime = dateTime;
- this.extra = extra;
- }
-
- /**
- * Return a copy of person which capture the given annotation.
- * (Used for debugging).
- */
- public Person withAnnotation(String annotation) {
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
- annotation + ": " + extra);
- }
-
- /**
- * Does person have {@code annotation}? (Used for debugging.)
- */
- public boolean hasAnnotation(String annotation) {
- return extra.startsWith(annotation + ": ");
- }
-
- /**
- * Remove {@code annotation} from person. (Used for debugging.)
- */
- public Person withoutAnnotation(String annotation) {
- if (hasAnnotation(annotation)) {
- return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
- extra.substring(annotation.length() + 2));
- } else {
- return this;
- }
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
- + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
deleted file mode 100644
index 61537f6..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.model;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of Query6.
- */
-public class SellerPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
- @Override
- public void encode(SellerPrice value, OutputStream outStream)
- throws CoderException, IOException {
- LONG_CODER.encode(value.seller, outStream);
- LONG_CODER.encode(value.price, outStream);
- }
-
- @Override
- public SellerPrice decode(
- InputStream inStream)
- throws CoderException, IOException {
- long seller = LONG_CODER.decode(inStream);
- long price = LONG_CODER.decode(inStream);
- return new SellerPrice(seller, price);
- }
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- };
-
- @JsonProperty
- public final long seller;
-
- /** Price in cents. */
- @JsonProperty
- private final long price;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private SellerPrice() {
- seller = 0;
- price = 0;
- }
-
- public SellerPrice(long seller, long price) {
- this.seller = seller;
- this.price = price;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
deleted file mode 100644
index e1d6113..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Nexmark Benchmark Model.
- */
-package org.apache.beam.integration.nexmark.model;
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
deleted file mode 100644
index df6f09f..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Nexmark.
- */
-package org.apache.beam.integration.nexmark;