You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:01 UTC

[53/55] [abbrv] beam git commit: Clean, fix findbugs, fix checkstyle

Clean, fix findbugs, fix checkstyle


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f9b4948
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f9b4948
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f9b4948

Branch: refs/heads/master
Commit: 2f9b4948fd60a749ada832d003acf0bd84875fcb
Parents: 6c11670
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200

----------------------------------------------------------------------
 .../nexmark/NexmarkConfiguration.java           |  9 +-
 .../integration/nexmark/NexmarkLauncher.java    | 62 +++++-------
 .../integration/nexmark/NexmarkOptions.java     |  3 +-
 .../beam/integration/nexmark/NexmarkSuite.java  |  4 +-
 .../beam/integration/nexmark/model/Event.java   | 99 ++++++++++----------
 .../nexmark/queries/Query0Model.java            |  1 -
 .../nexmark/queries/Query1Model.java            |  1 -
 .../integration/nexmark/queries/Query3.java     |  8 +-
 .../integration/nexmark/queries/Query5.java     | 68 ++++++++------
 .../integration/nexmark/queries/Query7.java     |  2 +-
 .../nexmark/queries/Query7Model.java            |  1 -
 .../nexmark/queries/WinningBids.java            | 37 +++++++-
 .../nexmark/queries/WinningBidsSimulator.java   |  1 -
 .../integration/nexmark/sources/Generator.java  | 36 +++++--
 .../nexmark/sources/GeneratorConfig.java        | 29 +++---
 .../integration/nexmark/queries/QueryTest.java  |  6 +-
 .../sources/UnboundedEventSourceTest.java       |  6 +-
 17 files changed, 211 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index 5a8cb71..2faf3f5 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Objects;
@@ -359,11 +358,11 @@ public class NexmarkConfiguration implements Serializable {
   }
 
   /**
-   * Return clone of configuration with given label.
+   * Return copy of configuration with given label.
    */
-  @Override
-  public NexmarkConfiguration clone() {
-    NexmarkConfiguration result = new NexmarkConfiguration();
+  public NexmarkConfiguration copy() {
+    NexmarkConfiguration result;
+    result = new NexmarkConfiguration();
     result.debug = debug;
     result.query = query;
     result.sourceType = sourceType;

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
index db53191..a609975 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -87,11 +87,13 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
+import org.slf4j.LoggerFactory;
 
 /**
  * Run a single Nexmark query using a given configuration.
  */
 public class NexmarkLauncher<OptionT extends NexmarkOptions> {
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
   /**
    * Minimum number of samples needed for 'stead-state' rate calculation.
    */
@@ -166,13 +168,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
   }
 
   /**
-   * Return number of cores per worker.
-   */
-  protected int coresPerWorker() {
-    return 4;
-  }
-
-  /**
    * Return maximum number of workers.
    */
   private int maxNumWorkers() {
@@ -185,7 +180,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    */
   private long getCounterMetric(PipelineResult result, String namespace, String name,
     long defaultValue) {
-    //TODO Ismael calc this only once
     MetricQueryResults metrics = result.metrics().queryMetrics(
         MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
     Iterable<MetricResult<Long>> counters = metrics.counters();
@@ -193,7 +187,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
       MetricResult<Long> metricResult = counters.iterator().next();
       return metricResult.attempted();
     } catch (NoSuchElementException e) {
-      //TODO Ismael
+      LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
     }
     return defaultValue;
   }
@@ -209,15 +203,20 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
     Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
     try {
       MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
-      if (distType.equals(DistributionType.MIN)) {
-        return distributionResult.attempted().min();
-      } else if (distType.equals(DistributionType.MAX)) {
-        return distributionResult.attempted().max();
-      } else {
-        //TODO Ismael
+      switch (distType)
+      {
+        case MIN:
+          return distributionResult.attempted().min();
+        case MAX:
+          return distributionResult.attempted().max();
+        default:
+          return defaultValue;
       }
     } catch (NoSuchElementException e) {
-      //TODO Ismael
+      LOG.error(
+          "Failed to get distribution metric {} for namespace {}",
+          name,
+          namespace);
     }
     return defaultValue;
   }
@@ -228,7 +227,9 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    * Return the current value for a time counter, or -1 if can't be retrieved.
    */
   private long getTimestampMetric(long now, long value) {
-    //TODO Ismael improve doc
+    // timestamp metrics are used to monitor time of execution of transforms.
+    // If result timestamp metric is too far from now, consider that metric is erroneous
+
     if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
       return -1;
     }
@@ -437,16 +438,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    */
   private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
     builder.build(options);
-//    throw new UnsupportedOperationException(
-//        "Cannot use --pubSubMode=COMBINED with DirectRunner");
-  }
-
-  /**
-   * If monitoring, wait until the publisher pipeline has run long enough to establish
-   * a backlog on the Pubsub topic. Otherwise, return immediately.
-   */
-  private void waitForPublisherPreload() {
-    throw new UnsupportedOperationException();
   }
 
   /**
@@ -606,11 +597,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
         publisherJob.waitUntilFinish(Duration.standardMinutes(5));
       } catch (IOException e) {
         throw new RuntimeException("Unable to cancel publisher job: ", e);
-      } //TODO Ismael
-//      catch (InterruptedException e) {
-//        Thread.interrupted();
-//        throw new RuntimeException("Interrupted: publish job still running.", e);
-//      }
+      }
     }
 
     return perf;
@@ -755,7 +742,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
             Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
             c.output(event);
           } catch (CoderException e) {
-            // TODO Log decoding Event error
+            LOG.error("Error while decoding Event from pusbSub message: serialization error");
           }
         }
       }));
@@ -798,7 +785,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
                   byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
                   c.output(new PubsubMessage(payload, new HashMap<String, String>()));
                 } catch (CoderException e1) {
-                  // TODO Log encoding Event error
+                  LOG.error("Error while sending Event {} to pusbSub: serialization error",
+                      c.element().toString());
                 }
               }
             })
@@ -1130,7 +1118,8 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
           sinkEventsToAvro(source);
         }
 
-        // Special hacks for Query 10 (big logger).
+        // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs,
+        // so, set parallelism. Also set the output path where to write log files.
         if (configuration.query == 10) {
           String path = null;
           if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
@@ -1158,9 +1147,6 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
         sink(results, now);
       }
 
-      if (publisherResult != null) {
-        waitForPublisherPreload();
-      }
       mainResult = p.run();
       mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
       return monitor(query);

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 9afffaa..fbd3e74 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -19,7 +19,6 @@ package org.apache.beam.integration.nexmark;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -119,7 +118,7 @@ public interface NexmarkOptions
   @Nullable
   Integer getStreamTimeout();
 
-  void setStreamTimeout(Integer preloadSeconds);
+  void setStreamTimeout(Integer streamTimeout);
 
   @Description("Number of unbounded sources to create events.")
   @Nullable

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
index be7d7b8..0d98a5d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
@@ -57,7 +57,7 @@ public enum NexmarkSuite {
   private static List<NexmarkConfiguration> smoke() {
     List<NexmarkConfiguration> configurations = new ArrayList<>();
     for (int query = 0; query <= 12; query++) {
-      NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.clone();
+      NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
       configuration.query = query;
       configuration.numEvents = 100_000;
       if (query == 4 || query == 6 || query == 9) {
@@ -103,7 +103,7 @@ public enum NexmarkSuite {
   public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
     Set<NexmarkConfiguration> results = new LinkedHashSet<>();
     for (NexmarkConfiguration configuration : configurations) {
-      NexmarkConfiguration result = configuration.clone();
+      NexmarkConfiguration result = configuration.copy();
       result.overrideFromOptions(options);
       results.add(result);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index d813833..0e1672e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -23,55 +23,65 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 
 /**
- * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
- * or a {@link Bid}.
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
+ * {@link Bid}.
  */
 public class Event implements KnownSize, Serializable {
-  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+  private enum Tag {
+    PERSON(0),
+    AUCTION(1),
+    BID(2);
 
-  public static final Coder<Event> CODER = new CustomCoder<Event>() {
-    @Override
-    public void encode(Event value, OutputStream outStream)
-        throws CoderException, IOException {
-      if (value.newPerson != null) {
-        INT_CODER.encode(0, outStream);
-        Person.CODER.encode(value.newPerson, outStream);
-      } else if (value.newAuction != null) {
-        INT_CODER.encode(1, outStream);
-        Auction.CODER.encode(value.newAuction, outStream);
-      } else if (value.bid != null) {
-        INT_CODER.encode(2, outStream);
-        Bid.CODER.encode(value.bid, outStream);
-      } else {
-        throw new RuntimeException("invalid event");
-      }
-    }
+    private int value = -1;
 
-    @Override
-    public Event decode(
-        InputStream inStream)
-        throws CoderException, IOException {
-      int tag = INT_CODER.decode(inStream);
-      if (tag == 0) {
-        Person person = Person.CODER.decode(inStream);
-        return new Event(person);
-      } else if (tag == 1) {
-        Auction auction = Auction.CODER.decode(inStream);
-        return new Event(auction);
-      } else if (tag == 2) {
-        Bid bid = Bid.CODER.decode(inStream);
-        return new Event(bid);
-      } else {
-        throw new RuntimeException("invalid event encoding");
-      }
+    Tag(int value){
+      this.value = value;
     }
-    @Override public void verifyDeterministic() throws NonDeterministicException {}
-  };
+  }
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<Event> CODER =
+      new CustomCoder<Event>() {
+        @Override
+        public void encode(Event value, OutputStream outStream) throws IOException {
+          if (value.newPerson != null) {
+            INT_CODER.encode(Tag.PERSON.value, outStream);
+            Person.CODER.encode(value.newPerson, outStream);
+          } else if (value.newAuction != null) {
+            INT_CODER.encode(Tag.AUCTION.value, outStream);
+            Auction.CODER.encode(value.newAuction, outStream);
+          } else if (value.bid != null) {
+            INT_CODER.encode(Tag.BID.value, outStream);
+            Bid.CODER.encode(value.bid, outStream);
+          } else {
+            throw new RuntimeException("invalid event");
+          }
+        }
+
+        @Override
+        public Event decode(InputStream inStream) throws IOException {
+          int tag = INT_CODER.decode(inStream);
+          if (tag == Tag.PERSON.value) {
+            Person person = Person.CODER.decode(inStream);
+            return new Event(person);
+          } else if (tag == Tag.AUCTION.value) {
+            Auction auction = Auction.CODER.decode(inStream);
+            return new Event(auction);
+          } else if (tag == Tag.BID.value) {
+            Bid bid = Bid.CODER.decode(inStream);
+            return new Event(bid);
+          } else {
+            throw new RuntimeException("invalid event encoding");
+          }
+        }
+
+        @Override
+        public void verifyDeterministic() throws NonDeterministicException {}
+      };
 
   @Nullable
   @org.apache.avro.reflect.Nullable
@@ -111,10 +121,7 @@ public class Event implements KnownSize, Serializable {
     this.bid = bid;
   }
 
-  /**
-   * Return a copy of event which captures {@code annotation}.
-   * (Used for debugging).
-   */
+  /** Return a copy of event which captures {@code annotation}. (Used for debugging). */
   public Event withAnnotation(String annotation) {
     if (newPerson != null) {
       return new Event(newPerson.withAnnotation(annotation));
@@ -125,9 +132,7 @@ public class Event implements KnownSize, Serializable {
     }
   }
 
-  /**
-   * Does event have {@code annotation}? (Used for debugging.)
-   */
+  /** Does event have {@code annotation}? (Used for debugging.) */
   public boolean hasAnnotation(String annotation) {
     if (newPerson != null) {
       return newPerson.hasAnnotation(annotation);

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 8e65591..e2522b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -45,7 +45,6 @@ public class Query0Model extends NexmarkQueryModel {
         return;
       }
       addResult(timestampedEvent);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 5d4de45..f07db80 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -57,7 +57,6 @@ public class Query1Model extends NexmarkQueryModel implements Serializable {
       TimestampedValue<Bid> result =
           TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
       addResult(result);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index f74b78d..f2b66d7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -29,13 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -243,9 +243,9 @@ public class Query3 extends NexmarkQuery {
           theNewPerson = newPerson;
         } else {
           if (theNewPerson.equals(newPerson)) {
-            LOG.error("**** duplicate person {} ****", theNewPerson);
+            LOG.error("Duplicate person {}", theNewPerson);
           } else {
-            LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson);
+            LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson);
           }
           fatalCounter.inc();
           continue;

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 1944330..bdf3e5f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -63,56 +63,64 @@ public class Query5 extends NexmarkQuery {
         // Only want the bid events.
         .apply(JUST_BIDS)
         // Window the bids into sliding windows.
-        .apply(Window.<Bid>into(
-            SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
-                .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+        .apply(
+            Window.<Bid>into(
+                SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+                    .every(Duration.standardSeconds(configuration.windowPeriodSec))))
         // Project just the auction id.
         .apply("BidToAuction", BID_TO_AUCTION)
 
         // Count the number of bids per auction id.
         .apply(Count.<Long>perElement())
 
-      // We'll want to keep all auctions with the maximal number of bids.
+        // We'll want to keep all auctions with the maximal number of bids.
         // Start by lifting each into a singleton list.
         // need to do so because bellow combine returns a list of auctions in the key in case of
         // equal number of bids. Combine needs to have same input type and return type.
-        .apply(name + ".ToSingletons",
-            ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
+        .apply(
+            name + ".ToSingletons",
+            ParDo.of(
+                new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
-                    c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue()));
+                    c.output(
+                        KV.of(
+                            Collections.singletonList(c.element().getKey()),
+                            c.element().getValue()));
                   }
                 }))
 
         // Keep only the auction ids with the most bids.
         .apply(
-            Combine
-                .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
-                  @Override
-                  public KV<List<Long>, Long> apply(
-                      KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
-                    List<Long> leftBestAuctions = left.getKey();
-                    long leftCount = left.getValue();
-                    List<Long> rightBestAuctions = right.getKey();
-                    long rightCount = right.getValue();
-                    if (leftCount > rightCount) {
-                      return left;
-                    } else if (leftCount < rightCount) {
-                      return right;
-                    } else {
-                      List<Long> newBestAuctions = new ArrayList<>();
-                      newBestAuctions.addAll(leftBestAuctions);
-                      newBestAuctions.addAll(rightBestAuctions);
-                      return KV.of(newBestAuctions, leftCount);
-                    }
-                  }
-                })
+            Combine.globally(
+                    new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
+                      @Override
+                      public KV<List<Long>, Long> apply(
+                          KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
+                        List<Long> leftBestAuctions = left.getKey();
+                        long leftCount = left.getValue();
+                        List<Long> rightBestAuctions = right.getKey();
+                        long rightCount = right.getValue();
+                        if (leftCount > rightCount) {
+                          return left;
+                        } else if (leftCount < rightCount) {
+                          return right;
+                        } else {
+                          List<Long> newBestAuctions = new ArrayList<>();
+                          newBestAuctions.addAll(leftBestAuctions);
+                          newBestAuctions.addAll(rightBestAuctions);
+                          return KV.of(newBestAuctions, leftCount);
+                        }
+                      }
+                    })
                 .withoutDefaults()
                 .withFanout(configuration.fanout))
 
         // Project into result.
-        .apply(name + ".Select",
-            ParDo.of(new DoFn<KV<List<Long>, Long>, AuctionCount>() {
+        .apply(
+            name + ".Select",
+            ParDo.of(
+                new DoFn<KV<List<Long>, Long>, AuctionCount>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     long count = c.element().getValue();

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index 2a94ca9..217d0d4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -63,7 +63,7 @@ public class Query7 extends NexmarkQuery {
     // requires an additional scan per window, with the associated cost of snapshotted state and
     // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
     final PCollectionView<Long> maxPriceView =
-        slidingBids //
+        slidingBids
             .apply("BidToPrice", BID_TO_PRICE)
             .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
index 5c039f9..0ada5e8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
@@ -111,7 +111,6 @@ public class Query7Model extends NexmarkQueryModel implements Serializable {
       }
       // Keep only the highest bids.
       captureBid(event.bid);
-      //TODO test fails because offset of some hundreds of ms between expect and actual
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index bd6c2ed..d4ca177 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -139,6 +139,24 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
       return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
           start(), end(), auction, isAuctionWindow);
     }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      AuctionOrBidWindow that = (AuctionOrBidWindow) o;
+      return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction);
+    }
+
+    @Override public int hashCode() {
+      return Objects.hash(isAuctionWindow, auction);
+    }
   }
 
   /**
@@ -374,4 +392,21 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
           }
         ));
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(auctionOrBidWindowFn);
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    WinningBids that = (WinningBids) o;
+    return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
index 7d74f8f..9624a9d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
@@ -181,7 +181,6 @@ public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
         return;
       }
       addResult(result);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 4f548cd..f6deceb 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Bid;
@@ -167,7 +168,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
     }
 
     /**
-     * Return a deep clone of next event with delay added to wallclock timestamp and
+     * Return a deep copy of next event with delay added to wallclock timestamp and
      * event annotate as 'LATE'.
      */
     public NextEvent withDelay(long delayMs) {
@@ -175,6 +176,26 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
           wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark);
     }
 
+    @Override public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      NextEvent nextEvent = (NextEvent) o;
+
+      return (wallclockTimestamp == nextEvent.wallclockTimestamp
+          && eventTimestamp == nextEvent.eventTimestamp
+          && watermark == nextEvent.watermark
+          && event.equals(nextEvent.event));
+    }
+
+    @Override public int hashCode() {
+      return Objects.hash(wallclockTimestamp, eventTimestamp, watermark, event);
+    }
+
     @Override
     public int compareTo(NextEvent other) {
       int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp);
@@ -221,11 +242,12 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
   }
 
   /**
-   * Return a deep clone of this generator.
+   * Return a deep copy of this generator.
    */
-  @Override
-  public Generator clone() {
-    return new Generator(config.clone(), numEvents, wallclockBaseTime);
+  public Generator copy() {
+    checkNotNull(config);
+    Generator result = new Generator(config, numEvents, wallclockBaseTime);
+    return result;
   }
 
   /**
@@ -243,9 +265,9 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
    */
   public GeneratorConfig splitAtEventId(long eventId) {
     long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber);
-    GeneratorConfig remainConfig = config.cloneWith(config.firstEventId,
+    GeneratorConfig remainConfig = config.copyWith(config.firstEventId,
         config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents);
-    config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
+    config = config.copyWith(config.firstEventId, newMaxEvents, config.firstEventNumber);
     return remainConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
index 5799bb2..95c276b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.integration.nexmark.sources;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.values.KV;
  * Parameters controlling how {@link Generator} synthesizes {@link Event} elements.
  */
 public class GeneratorConfig implements Serializable {
+
   /**
    * We start the ids at specific values to help ensure the queries find a match even on
    * small synthesized dataset sizes.
@@ -132,18 +135,13 @@ public class GeneratorConfig implements Serializable {
   }
 
   /**
-   * Return a clone of this config.
-   */
-  @Override
-  public GeneratorConfig clone() {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
-  }
-
-  /**
-   * Return clone of this config except with given parameters.
+   * Return a copy of this config.
    */
-  public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) {
-    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+  public GeneratorConfig copy() {
+    GeneratorConfig result;
+      result = new GeneratorConfig(configuration, baseTime, firstEventId,
+          maxEvents, firstEventNumber);
+    return result;
   }
 
   /**
@@ -164,7 +162,7 @@ public class GeneratorConfig implements Serializable {
           // Don't loose any events to round-down.
           subMaxEvents = maxEvents - subMaxEvents * (n - 1);
         }
-        results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber));
+        results.add(copyWith(subFirstEventId, subMaxEvents, firstEventNumber));
         subFirstEventId += subMaxEvents;
       }
     }
@@ -172,6 +170,13 @@ public class GeneratorConfig implements Serializable {
   }
 
   /**
+   * Return copy of this config except with given parameters.
+   */
+  public GeneratorConfig copyWith(long firstEventId, long maxEvents, long firstEventNumber) {
+    return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber);
+  }
+
+  /**
    * Return an estimate of the bytes needed by {@code numEvents}.
    */
   public long estimatedBytesForEvents(long numEvents) {

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index b005d65..64a8e4f 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -37,7 +37,7 @@ import org.junit.runners.JUnit4;
 /** Test the various NEXMark queries yield results coherent with their models. */
 @RunWith(JUnit4.class)
 public class QueryTest {
-  private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
+  private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.copy();
 
   static {
     // careful, results of tests are linked to numEventGenerators because of timestamp generation
@@ -55,12 +55,8 @@ public class QueryTest {
     if (streamingMode) {
       results =
           p.apply(name + ".ReadUnBounded", NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
-      //TODO Ismael this should not be called explicitly
-      results.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
     } else {
       results = p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
-      //TODO Ismael this should not be called explicitly
-      results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
     }
     PAssert.that(results).satisfies(model.assertionFor());
     PipelineResult result = p.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/2f9b4948/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
index 1d04e2a..1ecc33e 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
 import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -95,12 +94,11 @@ public class UnboundedEventSourceTest {
 
     while (n > 0) {
       int m = Math.min(459 + random.nextInt(455), n);
-      System.out.printf("reading %d...\n", m);
+      System.out.printf("reading %d...%n", m);
       checker.add(m, reader, modelGenerator);
       n -= m;
-      System.out.printf("splitting with %d remaining...\n", n);
+      System.out.printf("splitting with %d remaining...%n", n);
       CheckpointMark checkpointMark = reader.getCheckpointMark();
-      assertTrue(checkpointMark instanceof Generator.Checkpoint);
       reader = source.createReader(options, (Generator.Checkpoint) checkpointMark);
     }