You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:19 UTC
[11/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
new file mode 100644
index 0000000..6512cc1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query4.
+ */
+public class CategoryPrice implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+ @Override
+ public void encode(CategoryPrice value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.category, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+ }
+
+ @Override
+ public CategoryPrice decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long category = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ boolean isLast = INT_CODER.decode(inStream, context) != 0;
+ return new CategoryPrice(category, price, isLast);
+ }
+ };
+
+ @JsonProperty
+ public final long category;
+
+ /** Price in cents. */
+ @JsonProperty
+ public final long price;
+
+ @JsonProperty
+ public final boolean isLast;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private CategoryPrice() {
+ category = 0;
+ price = 0;
+ isLast = false;
+ }
+
+ public CategoryPrice(long category, long price, boolean isLast) {
+ this.category = category;
+ this.price = price;
+ this.isLast = isLast;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8 + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
new file mode 100644
index 0000000..6009463
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * Result of query 10.
+ */
+public class Done implements KnownSize, Serializable {
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+ @Override
+ public void encode(Done value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ STRING_CODER.encode(value.message, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Done decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ String message = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Done(message);
+ }
+ };
+
+ @JsonProperty
+ public final String message;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ public Done() {
+ message = null;
+ }
+
+ public Done(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return message.length();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
new file mode 100644
index 0000000..8a278bf
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+/**
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
+ * or a {@link Bid}.
+ */
+public class Event implements KnownSize, Serializable {
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+ @Override
+ public void encode(Event value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (value.newPerson != null) {
+ INT_CODER.encode(0, outStream, Context.NESTED);
+ Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+ } else if (value.newAuction != null) {
+ INT_CODER.encode(1, outStream, Context.NESTED);
+ Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+ } else if (value.bid != null) {
+ INT_CODER.encode(2, outStream, Context.NESTED);
+ Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public Event decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ int tag = INT_CODER.decode(inStream, context);
+ if (tag == 0) {
+ Person person = Person.CODER.decode(inStream, Context.NESTED);
+ return new Event(person);
+ } else if (tag == 1) {
+ Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ return new Event(auction);
+ } else if (tag == 2) {
+ Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ return new Event(bid);
+ } else {
+ throw new RuntimeException("invalid event encoding");
+ }
+ }
+ };
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Person newPerson;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Auction newAuction;
+
+ @Nullable
+ @org.apache.avro.reflect.Nullable
+ public final Bid bid;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Event() {
+ newPerson = null;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Person newPerson) {
+ this.newPerson = newPerson;
+ newAuction = null;
+ bid = null;
+ }
+
+ public Event(Auction newAuction) {
+ newPerson = null;
+ this.newAuction = newAuction;
+ bid = null;
+ }
+
+ public Event(Bid bid) {
+ newPerson = null;
+ newAuction = null;
+ this.bid = bid;
+ }
+
+ /**
+ * Return a copy of event which captures {@code annotation}.
+ * (Used for debugging).
+ */
+ public Event withAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withAnnotation(annotation));
+ } else {
+ return new Event(bid.withAnnotation(annotation));
+ }
+ }
+
+ /**
+ * Does event have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ if (newPerson != null) {
+ return newPerson.hasAnnotation(annotation);
+ } else if (newAuction != null) {
+ return newAuction.hasAnnotation(annotation);
+ } else {
+ return bid.hasAnnotation(annotation);
+ }
+ }
+
+ /**
+ * Remove {@code annotation} from event. (Used for debugging.)
+ */
+ public Event withoutAnnotation(String annotation) {
+ if (newPerson != null) {
+ return new Event(newPerson.withoutAnnotation(annotation));
+ } else if (newAuction != null) {
+ return new Event(newAuction.withoutAnnotation(annotation));
+ } else {
+ return new Event(bid.withoutAnnotation(annotation));
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ if (newPerson != null) {
+ return 1 + newPerson.sizeInBytes();
+ } else if (newAuction != null) {
+ return 1 + newAuction.sizeInBytes();
+ } else if (bid != null) {
+ return 1 + bid.sizeInBytes();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (newPerson != null) {
+ return newPerson.toString();
+ } else if (newAuction != null) {
+ return newAuction.toString();
+ } else if (bid != null) {
+ return bid.toString();
+ } else {
+ throw new RuntimeException("invalid event");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
new file mode 100644
index 0000000..5d22651
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result type of Query8.
+ */
+public class IdNameReserve implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
+ @Override
+ public void encode(IdNameReserve value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+ }
+
+ @Override
+ public IdNameReserve decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+ return new IdNameReserve(id, name, reserve);
+ }
+ };
+
+ @JsonProperty
+ public final long id;
+
+ @JsonProperty
+ public final String name;
+
+ /** Reserve price in cents. */
+ @JsonProperty
+ public final long reserve;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private IdNameReserve() {
+ id = 0;
+ name = null;
+ reserve = 0;
+ }
+
+ public IdNameReserve(long id, String name, long reserve) {
+ this.id = id;
+ this.name = name;
+ this.reserve = reserve;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + name.length() + 1 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
new file mode 100644
index 0000000..c742eac
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+/**
+ * Interface for elements which can quickly estimate their encoded byte size.
+ */
+public interface KnownSize {
+ long sizeInBytes();
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
new file mode 100644
index 0000000..ac22879
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query3.
+ */
+public class NameCityStateId implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
+ @Override
+ public void encode(NameCityStateId value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ STRING_CODER.encode(value.city, outStream, Context.NESTED);
+ STRING_CODER.encode(value.state, outStream, Context.NESTED);
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ }
+
+ @Override
+ public NameCityStateId decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ String city = STRING_CODER.decode(inStream, Context.NESTED);
+ String state = STRING_CODER.decode(inStream, Context.NESTED);
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ return new NameCityStateId(name, city, state, id);
+ }
+ };
+
+ @JsonProperty
+ public final String name;
+
+ @JsonProperty
+ public final String city;
+
+ @JsonProperty
+ public final String state;
+
+ @JsonProperty
+ public final long id;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private NameCityStateId() {
+ name = null;
+ city = null;
+ state = null;
+ id = 0;
+ }
+
+ public NameCityStateId(String name, String city, String state, long id) {
+ this.name = name;
+ this.city = city;
+ this.state = state;
+ this.id = id;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
new file mode 100644
index 0000000..85c7183
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * A person either creating an auction or making a bid.
+ */
+public class Person implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+ @Override
+ public void encode(Person value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
+ STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
+ STRING_CODER.encode(value.city, outStream, Context.NESTED);
+ STRING_CODER.encode(value.state, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Person decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
+ String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
+ String city = STRING_CODER.decode(inStream, Context.NESTED);
+ String state = STRING_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
+ }
+ };
+
+ /** Id of person. */
+ @JsonProperty
+ public final long id; // primary key
+
+ /** Extra person properties. */
+ @JsonProperty
+ public final String name;
+
+ @JsonProperty
+ public final String emailAddress;
+
+ @JsonProperty
+ public final String creditCard;
+
+ @JsonProperty
+ public final String city;
+
+ @JsonProperty
+ public final String state;
+
+ @JsonProperty
+ public final long dateTime;
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Person() {
+ id = 0;
+ name = null;
+ emailAddress = null;
+ creditCard = null;
+ city = null;
+ state = null;
+ dateTime = 0;
+ extra = null;
+ }
+
+ public Person(long id, String name, String emailAddress, String creditCard, String city,
+ String state, long dateTime, String extra) {
+ this.id = id;
+ this.name = name;
+ this.emailAddress = emailAddress;
+ this.creditCard = creditCard;
+ this.city = city;
+ this.state = state;
+ this.dateTime = dateTime;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of person which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Person withAnnotation(String annotation) {
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+ annotation + ": " + extra);
+ }
+
+ /**
+ * Does person have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from person. (Used for debugging.)
+ */
+ public Person withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+ extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
+ + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
new file mode 100644
index 0000000..b7c2b14
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query6.
+ */
+public class SellerPrice implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+ public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() {
+ @Override
+ public void encode(SellerPrice value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+ LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ }
+
+ @Override
+ public SellerPrice decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long seller = LONG_CODER.decode(inStream, Context.NESTED);
+ long price = LONG_CODER.decode(inStream, Context.NESTED);
+ return new SellerPrice(seller, price);
+ }
+ };
+
+ @JsonProperty
+ public final long seller;
+
+ /** Price in cents. */
+ @JsonProperty
+ public final long price;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private SellerPrice() {
+ seller = 0;
+ price = 0;
+ }
+
+ public SellerPrice(long seller, long price) {
+ this.seller = seller;
+ this.price = price;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + 8;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
new file mode 100644
index 0000000..e1d6113
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Benchmark Model.
+ */
+package org.apache.beam.integration.nexmark.model;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
index 65bf7d4..df6f09f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
@@ -16,6 +16,6 @@
* limitations under the License.
*/
/**
- * Nexmark Benchmark Integration Queries.
+ * Nexmark.
*/
package org.apache.beam.integration.nexmark;
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
new file mode 100644
index 0000000..f60d5de
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 0: Pass events through unchanged. However, force them to do a round trip through
+ * serialization so that we measure the impact of the choice of coders.
+ */
+public class Query0 extends NexmarkQuery {
+ public Query0(NexmarkConfiguration configuration) {
+ super(configuration, "Query0");
+ }
+
+ private PCollection<Event> applyTyped(PCollection<Event> events) {
+ final Coder<Event> coder = events.getCoder();
+ return events
+ // Force round trip through coder.
+ .apply(name + ".Serialize",
+ ParDo.of(new DoFn<Event, Event>() {
+ private final Aggregator<Long, Long> bytes =
+ createAggregator("bytes", Sum.ofLongs());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws CoderException, IOException {
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ coder.encode(c.element(), outStream, Coder.Context.OUTER);
+ byte[] byteArray = outStream.toByteArray();
+ bytes.addValue((long) byteArray.length);
+ ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+ Event event = coder.decode(inStream, Coder.Context.OUTER);
+ c.output(event);
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
new file mode 100644
index 0000000..991b1d4
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query0}.
+ */
+public class Query0Model extends NexmarkQueryModel {
+ /**
+ * Simulator for query 0.
+ */
+ private class Simulator extends AbstractSimulator<Event, Event> {
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+ addResult(timestampedEvent);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
+ }
+ }
+
+ public Query0Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueTimestampOrder(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
new file mode 100644
index 0000000..0be77ce
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
+ * FROM bid [ROWS UNBOUNDED];
+ * </pre>
+ *
+ * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
+ * slowed down.
+ */
+public class Query1 extends NexmarkQuery {
+ public Query1(NexmarkConfiguration configuration) {
+ super(configuration, "Query1");
+ }
+
+ private PCollection<Bid> applyTyped(PCollection<Event> events) {
+ return events
+ // Only want the bid events.
+ .apply(JUST_BIDS)
+
+ // Map the conversion function over all bids.
+ .apply(name + ".ToEuros",
+ ParDo.of(new DoFn<Bid, Bid>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(new Bid(
+ bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
new file mode 100644
index 0000000..6912ed1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Done;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Query "10", 'Log to sharded files' (Not in original suite.)
+ *
+ * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
+ */
+public class Query10 extends NexmarkQuery {
+ private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
+ private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
+ private static final int NUM_SHARDS_PER_WORKER = 5;
+ private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
+
+ /**
+ * Capture everything we need to know about the records in a single output file.
+ */
+ private static class OutputFile implements Serializable {
+ /** Maximum possible timestamp of records in file. */
+ private final Instant maxTimestamp;
+ /** Shard within window. */
+ private final String shard;
+ /** Index of file in all files in shard. */
+ private final long index;
+ /** Timing of records in this file. */
+ private final PaneInfo.Timing timing;
+ /** Path to file containing records, or {@literal null} if no output required. */
+ @Nullable
+ private final String filename;
+
+ public OutputFile(
+ Instant maxTimestamp,
+ String shard,
+ long index,
+ PaneInfo.Timing timing,
+ @Nullable String filename) {
+ this.maxTimestamp = maxTimestamp;
+ this.shard = shard;
+ this.index = index;
+ this.timing = timing;
+ this.filename = filename;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+ }
+ }
+
+ /**
+ * GCS uri prefix for all log and 'finished' files. If null they won't be written.
+ */
+ @Nullable
+ private String outputPath;
+
+ /**
+ * Maximum number of workers, used to determine log sharding factor.
+ */
+ private int maxNumWorkers;
+
+ public Query10(NexmarkConfiguration configuration) {
+ super(configuration, "Query10");
+ }
+
+ public void setOutputPath(@Nullable String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public void setMaxNumWorkers(int maxNumWorkers) {
+ this.maxNumWorkers = maxNumWorkers;
+ }
+
+ /**
+ * Return channel for writing bytes to GCS.
+ *
+ * @throws IOException
+ */
+ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
+ throws IOException {
+ WritableByteChannel channel =
+ GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
+ checkState(channel instanceof GoogleCloudStorageWriteChannel);
+ ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
+ return channel;
+ }
+
+ /** Return a short string to describe {@code timing}. */
+ private String timingToString(PaneInfo.Timing timing) {
+ switch (timing) {
+ case EARLY:
+ return "E";
+ case ON_TIME:
+ return "O";
+ case LATE:
+ return "L";
+ }
+ throw new RuntimeException(); // cases are exhaustive
+ }
+
+ /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
+ private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
+ @Nullable String filename =
+ outputPath == null
+ ? null
+ : String.format("%s/LOG-%s-%s-%03d-%s-%x",
+ outputPath, window.maxTimestamp(), shard, pane.getIndex(),
+ timingToString(pane.getTiming()),
+ ThreadLocalRandom.current().nextLong());
+ return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
+ pane.getTiming(), filename);
+ }
+
+ /**
+ * Return path to which we should write the index for {@code window}, or {@literal null}
+ * if no output required.
+ */
+ @Nullable
+ private String indexPathFor(BoundedWindow window) {
+ if (outputPath == null) {
+ return null;
+ }
+ return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
+ }
+
+ private PCollection<Done> applyTyped(PCollection<Event> events) {
+ final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
+
+ return events.apply(name + ".ShardEvents",
+ ParDo.of(new DoFn<Event, KV<String, Event>>() {
+ final Aggregator<Long, Long> lateCounter =
+ createAggregator("actuallyLateEvent", Sum.ofLongs());
+ final Aggregator<Long, Long> onTimeCounter =
+ createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.element().hasAnnotation("LATE")) {
+ lateCounter.addValue(1L);
+ LOG.error("Observed late: %s", c.element());
+ } else {
+ onTimeCounter.addValue(1L);
+ }
+ int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+ String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+ c.output(KV.of(shard, c.element()));
+ }
+ }))
+ .apply(name + ".WindowEvents",
+ Window.<KV<String, Event>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(AfterEach.inOrder(
+ Repeatedly
+ .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(LATE_BATCHING_PERIOD)))))
+ .discardingFiredPanes()
+ // Use a 1 day allowed lateness so that any forgotten hold will stall the
+ // pipeline for that period and be very noticeable.
+ .withAllowedLateness(Duration.standardDays(1)))
+ .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
+ .apply(name + ".CheckForLateEvents",
+ ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+ KV<String, Iterable<Event>>>() {
+ final Aggregator<Long, Long> earlyCounter =
+ createAggregator("earlyShard", Sum.ofLongs());
+ final Aggregator<Long, Long> onTimeCounter =
+ createAggregator("onTimeShard", Sum.ofLongs());
+ final Aggregator<Long, Long> lateCounter =
+ createAggregator("lateShard", Sum.ofLongs());
+ final Aggregator<Long, Long> unexpectedLatePaneCounter =
+ createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
+ final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
+ createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ int numLate = 0;
+ int numOnTime = 0;
+ for (Event event : c.element().getValue()) {
+ if (event.hasAnnotation("LATE")) {
+ numLate++;
+ } else {
+ numOnTime++;
+ }
+ }
+ String shard = c.element().getKey();
+ LOG.error(
+ "%s with timestamp %s has %d actually late and %d on-time "
+ + "elements in pane %s for window %s",
+ shard, c.timestamp(), numLate, numOnTime, c.pane(),
+ window.maxTimestamp());
+ if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+ if (numLate == 0) {
+ LOG.error(
+ "ERROR! No late events in late pane for %s", shard);
+ unexpectedLatePaneCounter.addValue(1L);
+ }
+ if (numOnTime > 0) {
+ LOG.error(
+ "ERROR! Have %d on-time events in late pane for %s",
+ numOnTime, shard);
+ unexpectedOnTimeElementCounter.addValue(1L);
+ }
+ lateCounter.addValue(1L);
+ } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+ if (numOnTime + numLate < configuration.maxLogEvents) {
+ LOG.error(
+ "ERROR! Only have %d events in early pane for %s",
+ numOnTime + numLate, shard);
+ }
+ earlyCounter.addValue(1L);
+ } else {
+ onTimeCounter.addValue(1L);
+ }
+ c.output(c.element());
+ }
+ }))
+ .apply(name + ".UploadEvents",
+ ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+ KV<Void, OutputFile>>() {
+ final Aggregator<Long, Long> savedFileCounter =
+ createAggregator("savedFile", Sum.ofLongs());
+ final Aggregator<Long, Long> writtenRecordsCounter =
+ createAggregator("writtenRecords", Sum.ofLongs());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window)
+ throws IOException {
+ String shard = c.element().getKey();
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ OutputFile outputFile = outputFileFor(window, shard, c.pane());
+ LOG.error(
+ "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+ shard, c.timestamp(), window.maxTimestamp(), c.pane());
+ if (outputFile.filename != null) {
+ LOG.error("Beginning write to '%s'", outputFile.filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(openWritableGcsFile(options, outputFile
+ .filename))) {
+ for (Event event : c.element().getValue()) {
+ Event.CODER.encode(event, output, Coder.Context.OUTER);
+ writtenRecordsCounter.addValue(1L);
+ if (++n % 10000 == 0) {
+ LOG.error("So far written %d records to '%s'", n,
+ outputFile.filename);
+ }
+ }
+ }
+ LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+ }
+ savedFileCounter.addValue(1L);
+ c.output(KV.<Void, OutputFile>of(null, outputFile));
+ }
+ }))
+ // Clear fancy triggering from above.
+ .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ // We expect no late data here, but we'll assume the worst so we can detect any.
+ .withAllowedLateness(Duration.standardDays(1))
+ .discardingFiredPanes())
+ // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel
+ // and Pardo is also in parallel, why group all elements in memory of the same executor?
+ .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
+ .apply(name + ".Index",
+ ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
+ final Aggregator<Long, Long> unexpectedLateCounter =
+ createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
+ final Aggregator<Long, Long> unexpectedEarlyCounter =
+ createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
+ final Aggregator<Long, Long> unexpectedIndexCounter =
+ createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
+ final Aggregator<Long, Long> finalizedCounter =
+ createAggregator("indexed", Sum.ofLongs());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window)
+ throws IOException {
+ if (c.pane().getTiming() == Timing.LATE) {
+ unexpectedLateCounter.addValue(1L);
+ LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.EARLY) {
+ unexpectedEarlyCounter.addValue(1L);
+ LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.ON_TIME
+ && c.pane().getIndex() != 0) {
+ unexpectedIndexCounter.addValue(1L);
+ LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+ } else {
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ LOG.error(
+ "Index with record timestamp %s, window timestamp %s, pane %s",
+ c.timestamp(), window.maxTimestamp(), c.pane());
+
+ @Nullable String filename = indexPathFor(window);
+ if (filename != null) {
+ LOG.error("Beginning write to '%s'", filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(
+ openWritableGcsFile(options, filename))) {
+ for (OutputFile outputFile : c.element().getValue()) {
+ output.write(outputFile.toString().getBytes());
+ n++;
+ }
+ }
+ LOG.error("Written all %d lines to '%s'", n, filename);
+ }
+ c.output(
+ new Done("written for timestamp " + window.maxTimestamp()));
+ finalizedCounter.addValue(1L);
+ }
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
new file mode 100644
index 0000000..4da99eb
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "11", 'User sessions' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
+ * However limit the session to at most {@code maxLogEvents}. Emit the number of
+ * bids per session.
+ */
+public class Query11 extends NexmarkQuery {
+ public Query11(NexmarkConfiguration configuration) {
+ super(configuration, "Query11");
+ }
+
+ private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+ return events.apply(JUST_BIDS)
+ .apply(name + ".Rekey",
+ // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
+ ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(KV.of(bid.bidder, (Void) null));
+ }
+ }))
+ .apply(Window.<KV<Long, Void>>into(
+ Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
+ .apply(Count.<Long, Void>perKey())
+ .apply(name + ".ToResult",
+ ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
new file mode 100644
index 0000000..c67401b
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "12", 'Processing time windows' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
+ * of bids per window.
+ */
+public class Query12 extends NexmarkQuery {
+ public Query12(NexmarkConfiguration configuration) {
+ super(configuration, "Query12");
+ }
+
+ private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+ return events
+ .apply(JUST_BIDS)
+ .apply(name + ".Rekey",
+ // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
+ ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(KV.of(bid.bidder, (Void) null));
+ }
+ }))
+ .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(
+ Duration.standardSeconds(configuration.windowSizeSec))))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+ .apply(Count.<Long, Void>perKey())
+ .apply(name + ".ToResult",
+ ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(
+ new BidsPerSession(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
new file mode 100644
index 0000000..58037d3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query1}.
+ */
+public class Query1Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 1.
+ */
+ private class Simulator extends AbstractSimulator<Event, Bid> {
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+ Event event = timestampedEvent.getValue();
+ if (event.bid == null) {
+ // Ignore non-bid events.
+ return;
+ }
+ Bid bid = event.bid;
+ Bid resultBid =
+ new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
+ TimestampedValue<Bid> result =
+ TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+ addResult(result);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
+ }
+ }
+
+ public Query1Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueTimestampOrder(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
new file mode 100644
index 0000000..4c8f878
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(auction, price)
+ * FROM Bid [NOW]
+ * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
+ * </pre>
+ *
+ * <p>As written that query will only yield a few hundred results over event streams of
+ * arbitrary size. To make it more interesting we instead choose bids for every
+ * {@code auctionSkip}'th auction.
+ */
+public class Query2 extends NexmarkQuery {
+ public Query2(NexmarkConfiguration configuration) {
+ super(configuration, "Query2");
+ }
+
+ private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
+ return events
+ // Only want the bid events.
+ .apply(JUST_BIDS)
+
+ // Select just the bids for the auctions we care about.
+ .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
+ @Override
+ public Boolean apply(Bid bid) {
+ return bid.auction % configuration.auctionSkip == 0;
+ }
+ }))
+
+ // Project just auction id and price.
+ .apply(name + ".Project",
+ ParDo.of(new DoFn<Bid, AuctionPrice>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(new AuctionPrice(bid.auction, bid.price));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
new file mode 100644
index 0000000..f578e4c
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query2}.
+ */
+public class Query2Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 2.
+ */
+ private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+ Event event = timestampedEvent.getValue();
+ if (event.bid == null) {
+ // Ignore non bid events.
+ return;
+ }
+ Bid bid = event.bid;
+ if (bid.auction % configuration.auctionSkip != 0) {
+ // Ignore bids for auctions we don't care about.
+ return;
+ }
+ AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
+ TimestampedValue<AuctionPrice> result =
+ TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
+ addResult(result);
+ }
+ }
+
+ public Query2Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueTimestampOrder(itr);
+ }
+}